handle consul retries down in DynamicSettings
so that everyone gets the benefits, not just instfs also include a new circuit breaker so that if consul is unresponsive for more than the retry interval, we just let failures through quickly for a while Change-Id: I9ba757c8529c1011ca771612f592f289c6a844b6 Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/270789 Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com> Reviewed-by: Simon Williams <simon@instructure.com> QA-Review: Cody Cutrer <cody@instructure.com> Product-Review: Cody Cutrer <cody@instructure.com>
This commit is contained in:
parent
41a2f28849
commit
b5945df961
|
@ -23,5 +23,5 @@ Gem::Specification.new do |spec|
|
|||
|
||||
spec.add_development_dependency 'bundler'
|
||||
spec.add_development_dependency 'rspec'
|
||||
|
||||
spec.add_development_dependency 'byebug'
|
||||
end
|
||||
|
|
|
@ -22,6 +22,7 @@ require 'active_support'
|
|||
require 'active_support/core_ext'
|
||||
require 'config_file'
|
||||
require 'diplomat'
|
||||
require 'dynamic_settings/circuit_breaker'
|
||||
require 'dynamic_settings/memory_cache'
|
||||
require 'dynamic_settings/fallback_proxy'
|
||||
require 'dynamic_settings/prefix_proxy'
|
||||
|
@ -35,7 +36,7 @@ module DynamicSettings
|
|||
class << self
|
||||
attr_accessor :environment
|
||||
attr_reader :fallback_data, :use_consul, :config
|
||||
attr_writer :fallback_recovery_lambda, :cache, :logger
|
||||
attr_writer :fallback_recovery_lambda, :retry_lambda, :cache, :logger
|
||||
|
||||
def config=(conf_hash)
|
||||
@config = conf_hash
|
||||
|
@ -58,6 +59,7 @@ module DynamicSettings
|
|||
@default_service = conf_hash.fetch('service', :canvas)
|
||||
@cache = conf_hash.fetch('cache', ::DynamicSettings::MemoryCache.new)
|
||||
@fallback_recovery_lambda = conf_hash.fetch('fallback_recovery_lambda', nil)
|
||||
@retry_lambda = conf_hash.fetch('retry_lambda', nil)
|
||||
@logger = conf_hash.fetch('logger', nil)
|
||||
else
|
||||
@environment = nil
|
||||
|
@ -76,9 +78,11 @@ module DynamicSettings
|
|||
end
|
||||
|
||||
def on_fallback_recovery(exception)
|
||||
if @fallback_recovery_lambda.present?
|
||||
@fallback_recovery_lambda.call(exception)
|
||||
end
|
||||
@fallback_recovery_lambda&.call(exception)
|
||||
end
|
||||
|
||||
def on_retry(exception)
|
||||
@retry_lambda&.call(exception)
|
||||
end
|
||||
|
||||
def on_reload!
|
||||
|
@ -132,7 +136,10 @@ module DynamicSettings
|
|||
cluster: cluster,
|
||||
default_ttl: default_ttl,
|
||||
data_center: data_center || @data_center,
|
||||
query_logging: @config.fetch('query_logging', true)
|
||||
query_logging: @config.fetch('query_logging', true),
|
||||
retry_limit: @config.fetch('retry_limit', 1),
|
||||
retry_base: @config.fetch('retry_base', 1.4),
|
||||
circuit_breaker: @config.fetch('circuit_breaker', nil)
|
||||
)
|
||||
else
|
||||
proxy = root_fallback_proxy
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# Copyright (C) 2021 - present Instructure, Inc.
|
||||
#
|
||||
# This file is part of Canvas.
|
||||
#
|
||||
# Canvas is free software: you can redistribute it and/or modify it under
|
||||
# the terms of the GNU Affero General Public License as published by the Free
|
||||
# Software Foundation, version 3 of the License.
|
||||
#
|
||||
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License along
|
||||
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
module DynamicSettings
|
||||
class CircuitBreaker
|
||||
def initialize(reset_interval = 1.minute)
|
||||
@reset_interval = reset_interval
|
||||
end
|
||||
|
||||
def tripped?
|
||||
@tripped_at && (Time.now.utc - @tripped_at) < @reset_interval
|
||||
end
|
||||
|
||||
def trip
|
||||
@tripped_at = Time.now.utc
|
||||
end
|
||||
end
|
||||
end
|
|
@ -24,7 +24,7 @@ module DynamicSettings
|
|||
DEFAULT_TTL = 5.minutes
|
||||
# The TTL for cached values if none is specified in the constructor
|
||||
|
||||
attr_reader :prefix, :tree, :service, :environment, :cluster
|
||||
attr_reader :prefix, :tree, :service, :environment, :cluster, :retry_limit, :retry_base, :circuit_breaker
|
||||
attr_accessor :query_logging
|
||||
|
||||
# Build a new prefix proxy
|
||||
|
@ -45,7 +45,10 @@ module DynamicSettings
|
|||
cluster: nil,
|
||||
default_ttl: DEFAULT_TTL,
|
||||
data_center: nil,
|
||||
query_logging: true)
|
||||
query_logging: true,
|
||||
retry_limit: nil,
|
||||
retry_base: nil,
|
||||
circuit_breaker: nil)
|
||||
@prefix = prefix
|
||||
@tree = tree
|
||||
@service = service
|
||||
|
@ -54,6 +57,9 @@ module DynamicSettings
|
|||
@default_ttl = default_ttl
|
||||
@data_center = data_center
|
||||
@query_logging = query_logging
|
||||
@retry_limit = retry_limit
|
||||
@retry_base = retry_base
|
||||
@circuit_breaker = circuit_breaker
|
||||
end
|
||||
|
||||
def cache
|
||||
|
@ -77,6 +83,8 @@ module DynamicSettings
|
|||
unknown_kwargs = kwargs.keys - [:failsafe]
|
||||
raise ArgumentError, "unknown keyword(s): #{unknown_kwargs.map(&:inspect).join(', ')}" unless unknown_kwargs.empty?
|
||||
|
||||
retry_count = 1
|
||||
|
||||
keys = [
|
||||
full_key(key),
|
||||
[tree, service, environment, prefix, key].compact.join("/"),
|
||||
|
@ -94,65 +102,84 @@ module DynamicSettings
|
|||
return result if result
|
||||
end
|
||||
|
||||
# okay now pre-cache an entire tree
|
||||
tree_key = [tree, service, environment].compact.join("/")
|
||||
# This longer TTL is important for race condition for now.
|
||||
# if the tree JUST expired, we don't want to find
|
||||
# a valid tree, and then no valid subkeys, that makes
|
||||
# nils start popping up in the cache. Subkeys should
|
||||
# last much longer than it takes to notice the tree key is
|
||||
# expired and trying to replace it. When the tree writes
|
||||
# are fully atomic, this is much less of a concern,
|
||||
# we could have one ttl again
|
||||
subtree_ttl = ttl * 2
|
||||
cache.fetch(CACHE_KEY_PREFIX + tree_key + '/', expires_in: ttl) do
|
||||
values = kv_fetch(tree_key, recurse: true, stale: true)
|
||||
if values.nil?
|
||||
# no sense trying to populate the subkeys
|
||||
# when there's no tree
|
||||
nil
|
||||
else
|
||||
populate_cache(values, subtree_ttl)
|
||||
values
|
||||
end
|
||||
end
|
||||
|
||||
keys.each do |full_key|
|
||||
# these keys will have been populated (or not!) above
|
||||
cache_result = cache.fetch(CACHE_KEY_PREFIX + full_key, expires_in: subtree_ttl) do
|
||||
# this should rarely happen. If we JUST populated the parent tree,
|
||||
# the value will already by in the cache. If it's NOT in the tree, we'll cache
|
||||
# a nil (intentionally) and not hit this fetch over and over. This protects us
|
||||
# from the race condition where we just expired and filled out the whole tree,
|
||||
# then the cache gets cleared, then we try to fetch one of the things we "know"
|
||||
# is in the cache now. It's better to fall back to asking consul in those cases.
|
||||
# these values will still get overwritten the next time the parent tree expires,
|
||||
# and they'll still go away eventually if we REMOVE a key from a subtree in consul.
|
||||
kv_fetch(full_key, stale: true)
|
||||
end
|
||||
return cache_result if cache_result
|
||||
end
|
||||
|
||||
fallback_keys.each do |full_key|
|
||||
result = cache.fetch(CACHE_KEY_PREFIX + full_key, expires_in: ttl) do
|
||||
kv_fetch(full_key, stale: true)
|
||||
end
|
||||
return result if result
|
||||
end
|
||||
DynamicSettings.logger.warn("[DYNAMIC_SETTINGS] config requested which was found no-where (#{key})")
|
||||
nil
|
||||
rescue Diplomat::KeyNotFound, Diplomat::UnknownStatus, Diplomat::PathNotFound, Errno::ECONNREFUSED => e
|
||||
if cache.respond_to?(:fetch_without_expiration)
|
||||
cache.fetch_without_expiration(CACHE_KEY_PREFIX + keys.first).tap do |val|
|
||||
if val
|
||||
DynamicSettings.on_fallback_recovery(e)
|
||||
return val
|
||||
begin
|
||||
# okay now pre-cache an entire tree
|
||||
tree_key = [tree, service, environment].compact.join("/")
|
||||
# This longer TTL is important for race condition for now.
|
||||
# if the tree JUST expired, we don't want to find
|
||||
# a valid tree, and then no valid subkeys, that makes
|
||||
# nils start popping up in the cache. Subkeys should
|
||||
# last much longer than it takes to notice the tree key is
|
||||
# expired and trying to replace it. When the tree writes
|
||||
# are fully atomic, this is much less of a concern,
|
||||
# we could have one ttl again
|
||||
subtree_ttl = ttl * 2
|
||||
cache.fetch(CACHE_KEY_PREFIX + tree_key + '/', expires_in: ttl) do
|
||||
values = kv_fetch(tree_key, recurse: true, stale: true)
|
||||
if values.nil?
|
||||
# no sense trying to populate the subkeys
|
||||
# when there's no tree
|
||||
nil
|
||||
else
|
||||
populate_cache(values, subtree_ttl)
|
||||
values
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
return kwargs[:failsafe] if kwargs.key?(:failsafe)
|
||||
raise
|
||||
keys.each do |full_key|
|
||||
# these keys will have been populated (or not!) above
|
||||
cache_result = cache.fetch(CACHE_KEY_PREFIX + full_key, expires_in: subtree_ttl) do
|
||||
# this should rarely happen. If we JUST populated the parent tree,
|
||||
# the value will already by in the cache. If it's NOT in the tree, we'll cache
|
||||
# a nil (intentionally) and not hit this fetch over and over. This protects us
|
||||
# from the race condition where we just expired and filled out the whole tree,
|
||||
# then the cache gets cleared, then we try to fetch one of the things we "know"
|
||||
# is in the cache now. It's better to fall back to asking consul in those cases.
|
||||
# these values will still get overwritten the next time the parent tree expires,
|
||||
# and they'll still go away eventually if we REMOVE a key from a subtree in consul.
|
||||
kv_fetch(full_key, stale: true)
|
||||
end
|
||||
return cache_result if cache_result
|
||||
end
|
||||
|
||||
fallback_keys.each do |full_key|
|
||||
result = cache.fetch(CACHE_KEY_PREFIX + full_key, expires_in: ttl) do
|
||||
kv_fetch(full_key, stale: true)
|
||||
end
|
||||
return result if result
|
||||
end
|
||||
DynamicSettings.logger.warn("[DYNAMIC_SETTINGS] config requested which was found no-where (#{key})")
|
||||
nil
|
||||
rescue Diplomat::KeyNotFound, Diplomat::UnknownStatus, Diplomat::PathNotFound, Errno::ECONNREFUSED => e
|
||||
if cache.respond_to?(:fetch_without_expiration)
|
||||
cache.fetch_without_expiration(CACHE_KEY_PREFIX + keys.first).tap do |val|
|
||||
if val
|
||||
DynamicSettings.on_fallback_recovery(e)
|
||||
return val
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
return kwargs[:failsafe] if kwargs.key?(:failsafe)
|
||||
|
||||
if retry_limit && retry_base && retry_count < retry_limit && !circuit_breaker&.tripped?
|
||||
# capture this to make sure that we have SOME
|
||||
# signal that the problem is continuing, even if our
|
||||
# retries are all successful.
|
||||
DynamicSettings.on_retry(e)
|
||||
|
||||
backoff_interval = retry_base ** retry_count
|
||||
retry_count += 1
|
||||
DynamicSettings.logger.warn("[DYNAMIC_SETTINGS] Consul error; retrying in #{backoff_interval} seconds...")
|
||||
sleep(backoff_interval)
|
||||
retry
|
||||
end
|
||||
|
||||
# retries failed; trip the circuit breaker and avoid retries for some amount of time
|
||||
circuit_breaker&.trip
|
||||
|
||||
raise
|
||||
end
|
||||
end
|
||||
alias [] fetch
|
||||
|
||||
|
|
|
@ -108,6 +108,53 @@ module DynamicSettings
|
|||
expect(Diplomat::Kv).to receive(:get).with('global/foo/bar/baz', { stale: true }).and_return(42).ordered
|
||||
expect(proxy.fetch('baz')).to eq 42
|
||||
end
|
||||
|
||||
context "with retries" do
|
||||
before do
|
||||
allow(proxy).to receive(:retry_limit).and_return(2)
|
||||
allow(proxy).to receive(:retry_base).and_return(1.4)
|
||||
end
|
||||
|
||||
it "retries if there is an initial error" do
|
||||
expect(Diplomat::Kv).to receive(:get_all).and_raise(Diplomat::KeyNotFound).ordered
|
||||
expect(proxy).to receive(:sleep).with(1.4)
|
||||
expect(Diplomat::Kv).to receive(:get_all).and_return([]).ordered
|
||||
allow(Diplomat::Kv).to receive(:get).with('foo/bar/baz', { stale: true }).and_return('qux')
|
||||
|
||||
expect(proxy.fetch('baz')).to eq 'qux'
|
||||
end
|
||||
|
||||
it "still raises errors if retries fail" do
|
||||
expect(Diplomat::Kv).to receive(:get_all).and_raise(Diplomat::KeyNotFound).twice
|
||||
expect(proxy).to receive(:sleep).with(1.4)
|
||||
|
||||
expect { proxy.fetch('baz') }.to raise_error(Diplomat::KeyNotFound)
|
||||
end
|
||||
|
||||
context "with circuit breaker" do
|
||||
let(:circuit_breaker) { CircuitBreaker.new }
|
||||
|
||||
before do
|
||||
allow(proxy).to receive(:circuit_breaker).and_return(circuit_breaker)
|
||||
end
|
||||
|
||||
it "fails immediately if the circuit breaker is tripped" do
|
||||
allow(circuit_breaker).to receive(:tripped?).and_return(true)
|
||||
expect(Diplomat::Kv).to receive(:get_all).and_raise(Diplomat::KeyNotFound)
|
||||
expect(proxy).not_to receive(:sleep)
|
||||
|
||||
expect { proxy.fetch('baz') }.to raise_error(Diplomat::KeyNotFound)
|
||||
end
|
||||
|
||||
it "trips the circuit breaker" do
|
||||
expect(Diplomat::Kv).to receive(:get_all).and_raise(Diplomat::KeyNotFound).twice
|
||||
expect(proxy).to receive(:sleep).with(1.4)
|
||||
|
||||
expect { proxy.fetch('baz') }.to raise_error(Diplomat::KeyNotFound)
|
||||
expect(circuit_breaker).to be_tripped
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'for_prefix(prefix_extension, default_ttl: @default_ttl)' do
|
||||
|
|
|
@ -100,11 +100,6 @@ module AuthenticationMethods
|
|||
# and for some normal use cases (old token, access token),
|
||||
# so we can return and move on
|
||||
return
|
||||
rescue Diplomat::KeyNotFound => exception
|
||||
# Something went wrong in the Network
|
||||
# these are indications of infrastructure or data problems
|
||||
# so we should log them for resolution, but recover gracefully
|
||||
Canvas::Errors.capture_exception(:jwt_check, exception, :warn)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -30,7 +30,12 @@ module Canvas
|
|||
# before we start bootstrapping other things in the app.
|
||||
def self.bootstrap!
|
||||
settings = ConfigFile.load("consul")
|
||||
|
||||
if settings.present?
|
||||
settings[:retry_limit] = Setting.get('consul_retry_count', 1).to_i
|
||||
settings[:retry_base] = Setting.get('consul_retry_base_interval', 1.4).to_f
|
||||
settings[:circuit_breaker] = ::DynamicSettings::CircuitBreaker.new(Setting.get('consul_circuit_breaker_interval', 60).to_f)
|
||||
|
||||
begin
|
||||
::DynamicSettings.config = settings
|
||||
rescue Diplomat::KeyNotFound, Diplomat::PathNotFound, Diplomat::UnknownStatus
|
||||
|
@ -53,6 +58,7 @@ module Canvas
|
|||
# this got pulled out into a local gem
|
||||
::DynamicSettings.cache = LocalCache
|
||||
::DynamicSettings.fallback_recovery_lambda = ->(e){ Canvas::Errors.capture_exception(:consul, e, :warn) }
|
||||
::DynamicSettings.retry_lambda = ->(e){ Canvas::Errors.capture_exception(:consul, e, :warn) }
|
||||
::DynamicSettings.logger = Rails.logger
|
||||
end
|
||||
end
|
||||
|
|
|
@ -270,41 +270,8 @@ module InstFS
|
|||
end
|
||||
|
||||
private
|
||||
def setting(key)
|
||||
unsafe_setting(key)
|
||||
rescue Diplomat::KeyNotFound => e
|
||||
# capture this to make sure that we have SOME
|
||||
# signal that the problem is continuing, even if our
|
||||
# retries are all successful.
|
||||
Canvas::Errors.capture_exception(:inst_fs, e, :warn)
|
||||
Rails.logger.warn("[INST_FS] Consul timeout hit during settings #{e}, entering retry handling...")
|
||||
retry_limit = Setting.get("inst_fs_config_retry_count", "5").to_i
|
||||
retry_base = Setting.get("inst_fs_config_retry_base_interval", "1.4").to_i
|
||||
retry_count = 1
|
||||
return_value = nil
|
||||
currently_in_job = Delayed::Worker.current_job.present?
|
||||
while retry_count <= retry_limit
|
||||
begin
|
||||
return_value = unsafe_setting(key)
|
||||
break
|
||||
rescue Diplomat::KeyNotFound => e
|
||||
retry_count += 1
|
||||
# if we're not currently in a job, one retry is all you get,
|
||||
# fail for the user and move on.
|
||||
raise e if !currently_in_job || retry_count > retry_limit
|
||||
backoff_interval = retry_base ** retry_count
|
||||
Rails.logger.warn("[INST_FS] Consul timeout hit during settings, retrying in #{backoff_interval} seconds...")
|
||||
sleep(backoff_interval)
|
||||
end
|
||||
end
|
||||
return_value
|
||||
end
|
||||
|
||||
# this is just to provide a convenient way to wrap
|
||||
# accessing a setting in retries (see #setting),
|
||||
# it should not be used by the rest of the code,
|
||||
# inside this class or otherwise.
|
||||
def unsafe_setting(key)
|
||||
def setting(key)
|
||||
Canvas::DynamicSettings.find(service: "inst-fs", default_ttl: 5.minutes)[key]
|
||||
end
|
||||
|
||||
|
|
|
@ -670,12 +670,6 @@ describe "API Authentication", type: :request do
|
|||
expect(JSON.parse(response.body).size).to eq 1
|
||||
end
|
||||
|
||||
it "recovers gracefully if consul is missing encryption data" do
|
||||
allow(Diplomat::Kv).to receive(:get) { |key| raise Diplomat::KeyNotFound, key }
|
||||
check_used { get "/api/v1/courses", headers: { 'HTTP_AUTHORIZATION' => "Bearer #{@token.full_token}" } }
|
||||
assert_status(200)
|
||||
end
|
||||
|
||||
it "should allow passing the access token in the post body" do
|
||||
@me = @user
|
||||
Account.default.account_users.create!(user: @user)
|
||||
|
|
|
@ -136,24 +136,6 @@ describe InstFS do
|
|||
end
|
||||
end
|
||||
|
||||
it "retries if consul is erroring out" do
|
||||
times_called = 0
|
||||
allow(Canvas::DynamicSettings).to receive(:find).with(service: "inst-fs", default_ttl: 5.minutes) do
|
||||
times_called += 1
|
||||
raise Diplomat::KeyNotFound if times_called < 2
|
||||
settings_hash
|
||||
end
|
||||
expect(InstFS.authenticated_url(@attachment, {}))
|
||||
.to match("#{app_host}/files/#{@attachment.instfs_uuid}/#{@attachment.filename}")
|
||||
end
|
||||
|
||||
it "actually fails with a config error if can't find config" do
|
||||
allow(Canvas::DynamicSettings).to receive(:find).with(service: "inst-fs", default_ttl: 5.minutes) do
|
||||
raise Diplomat::KeyNotFound
|
||||
end
|
||||
expect{ InstFS.authenticated_url(@attachment, {}) }.to raise_error(Diplomat::KeyNotFound)
|
||||
end
|
||||
|
||||
describe "jwt claims" do
|
||||
def claims_for(options={})
|
||||
url = InstFS.authenticated_url(@attachment, options)
|
||||
|
|
Loading…
Reference in New Issue