lock vault for new lease
refs FOO-779 TEST PLAN: 1) make many threads or processes try to load the same vault config 2) only one should actually call vault 3) they all should get a response Change-Id: Ic5b778ab994bd964d930fd40fee936f42a4fd91d Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/248155 Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com> Reviewed-by: Cody Cutrer <cody@instructure.com> QA-Review: Cody Cutrer <cody@instructure.com> Product-Review: Cody Cutrer <cody@instructure.com>
This commit is contained in:
parent
2fc25f9fe1
commit
441290a4f8
|
@ -19,6 +19,8 @@
|
|||
Bundler.require 'redis'
|
||||
|
||||
class ActiveSupport::Cache::HaStore < ActiveSupport::Cache::RedisCacheStore
|
||||
include ActiveSupport::Cache::SafeRedisRaceCondition
|
||||
|
||||
def initialize(consul_datacenters: nil,
|
||||
consul_event: nil,
|
||||
**additional_options)
|
||||
|
@ -26,7 +28,6 @@ class ActiveSupport::Cache::HaStore < ActiveSupport::Cache::RedisCacheStore
|
|||
options[:lock_timeout] ||= 5
|
||||
options[:consul_datacenters] = consul_datacenters
|
||||
options[:consul_event] = consul_event
|
||||
@delif = Redis::Scripting::Script.new(File.expand_path("../delif.lua", __FILE__))
|
||||
end
|
||||
|
||||
def clear
|
||||
|
@ -77,79 +78,4 @@ class ActiveSupport::Cache::HaStore < ActiveSupport::Cache::RedisCacheStore
|
|||
end
|
||||
end
|
||||
|
||||
def handle_expired_entry(entry, key, options)
|
||||
return super unless options[:race_condition_ttl]
|
||||
lock_key = "lock:#{key}"
|
||||
|
||||
unless entry
|
||||
while !entry
|
||||
unless (lock_nonce = lock(lock_key, options))
|
||||
# someone else is already generating it; wait for them
|
||||
sleep 0.1
|
||||
entry = read_entry(key, options)
|
||||
next
|
||||
else
|
||||
options[:lock_nonce] = lock_nonce
|
||||
break
|
||||
end
|
||||
end
|
||||
entry
|
||||
else
|
||||
if entry.expired? && (lock_nonce = lock(lock_key, options))
|
||||
options[:lock_nonce] = lock_nonce
|
||||
options[:stale_entry] = entry
|
||||
return nil
|
||||
end
|
||||
# just return the stale value; someone else is busy
|
||||
# regenerating it
|
||||
entry
|
||||
end
|
||||
end
|
||||
|
||||
def save_block_result_to_cache(name, options)
|
||||
super
|
||||
rescue => e
|
||||
raise unless options[:stale_entry]
|
||||
# if we have old stale data, silently swallow any
|
||||
# errors fetching fresh data, and return the stale entry
|
||||
Canvas::Errors.capture(e)
|
||||
return options[:stale_entry].value
|
||||
ensure
|
||||
# only unlock if we have an actual lock nonce, not just "true"
|
||||
# that happens on failure
|
||||
if options[:lock_nonce].is_a?(String)
|
||||
key = normalize_key(name, options)
|
||||
unlock("lock:#{key}", options[:lock_nonce])
|
||||
end
|
||||
end
|
||||
|
||||
def lock(key, options)
|
||||
nonce = SecureRandom.hex(20)
|
||||
case redis.set(key, nonce, raw: true, px: (options[:lock_timeout] * 1000).to_i, nx: true)
|
||||
when true
|
||||
nonce
|
||||
when nil
|
||||
# redis failed for reasons unknown; say "true" that we locked, but the
|
||||
# nonce is useless
|
||||
true
|
||||
when false
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
def unlock(key, nonce)
|
||||
raise ArgumentError("nonce can't be nil") unless nonce
|
||||
node = redis
|
||||
node = redis.node_for(key) if redis.is_a?(Redis::Distributed)
|
||||
@delif.run(node, [key], [nonce])
|
||||
end
|
||||
|
||||
# vanilla Rails is weird, and assumes "race_condition_ttl" is 5 minutes; override that to actually do math
|
||||
def write_entry(key, entry, unless_exist: false, raw: false, expires_in: nil, race_condition_ttl: nil, **options)
|
||||
if race_condition_ttl
|
||||
expires_in += race_condition_ttl
|
||||
race_condition_ttl = nil
|
||||
end
|
||||
super
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
#
|
||||
# Copyright (C) 2020 - present Instructure, Inc.
|
||||
#
|
||||
# This file is part of Canvas.
|
||||
#
|
||||
# Canvas is free software: you can redistribute it and/or modify it under
|
||||
# the terms of the GNU Affero General Public License as published by the Free
|
||||
# Software Foundation, version 3 of the License.
|
||||
#
|
||||
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License along
|
||||
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
|
||||
# SafeRedisRaceCondition is for handling the case
|
||||
# where some cache store needs to be able to "lock"
|
||||
# when someone asks for a given entry that has expired so that we don't have
|
||||
# multiple clients trying to do the same expensive regeneration.
|
||||
# An example might be regenerating credentials from vault for a whole box to use;
|
||||
# if 3 processes all find the current entry expired at the same time, you wouldn't
|
||||
# want them all to independently re-generate credentials (unnecessary traffic for both
|
||||
# vault and STS).
|
||||
#
|
||||
# This module assumes you are including it into something that eventually inherits from
|
||||
# an ActiveSupport::Cache::RedisStore, and overrides methods in that internal
|
||||
# implementation (depending on the existance of a redis client).
|
||||
module ActiveSupport::Cache::SafeRedisRaceCondition
|
||||
|
||||
# this is originally defined in ActiveSupport::Cache::Store,
|
||||
# and that implementation DOES handle race_condition_ttl by rewriting the
|
||||
# stale value back the cache with a slightly extended expiry.
|
||||
# It has no locking, though, so if the timing isn't quite right you can still get a few
|
||||
# processes regenerating at the same time. This, instead, will ONLY override the method
|
||||
# if the race_condition_ttl is defined, and in that case it will
|
||||
# use a nonce as a lock value so it's easy to tell on unlock
|
||||
# whether the lease has been re-issued
|
||||
def handle_expired_entry(entry, key, options)
|
||||
return super unless options[:race_condition_ttl]
|
||||
lock_key = "lock:#{key}"
|
||||
|
||||
unless entry
|
||||
while !entry
|
||||
unless (lock_nonce = lock(lock_key, options))
|
||||
# someone else is already generating it; wait for them
|
||||
sleep 0.1
|
||||
entry = read_entry(key, options)
|
||||
next
|
||||
else
|
||||
options[:lock_nonce] = lock_nonce
|
||||
break
|
||||
end
|
||||
end
|
||||
entry
|
||||
else
|
||||
if entry.expired? && (lock_nonce = lock(lock_key, options))
|
||||
options[:lock_nonce] = lock_nonce
|
||||
options[:stale_entry] = entry
|
||||
return nil
|
||||
end
|
||||
# just return the stale value; someone else is busy
|
||||
# regenerating it
|
||||
entry
|
||||
end
|
||||
end
|
||||
|
||||
# this is originally defined in ActiveSupport::Cache::Store,
|
||||
# we only override it to make sure we can recover if
|
||||
# we have stale data available, and to make sure unlocking happens
|
||||
# no matter what (even if the block dies)
|
||||
def save_block_result_to_cache(name, options)
|
||||
super
|
||||
rescue => e
|
||||
raise unless options[:stale_entry]
|
||||
# if we have old stale data, silently swallow any
|
||||
# errors fetching fresh data, and return the stale entry
|
||||
Canvas::Errors.capture(e)
|
||||
return options[:stale_entry].value
|
||||
ensure
|
||||
# only unlock if we have an actual lock nonce, not just "true"
|
||||
# that happens on failure
|
||||
if options[:lock_nonce].is_a?(String)
|
||||
key = normalize_key(name, options)
|
||||
unlock("lock:#{key}", options[:lock_nonce])
|
||||
end
|
||||
end
|
||||
|
||||
# lock is unique to this implementation, it's not a standard part of
|
||||
# rails caches. Pass a key to lock and you'll get back a nonce if you
|
||||
# hold the lease. You need to retain the nonce to unlock later, but the lock timeout
|
||||
# will make sure it gets released eventually.
|
||||
def lock(key, options)
|
||||
nonce = SecureRandom.hex(20)
|
||||
lock_timeout = options.fetch(:lock_timeout, 5).to_i * 1000
|
||||
case redis.set(key, nonce, raw: true, px: lock_timeout, nx: true)
|
||||
when true
|
||||
nonce
|
||||
when nil
|
||||
# redis failed for reasons unknown; say "true" that we locked, but the
|
||||
# nonce is useless
|
||||
true
|
||||
when false
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
# unlock is unique to this implementation, it's not a standard part of
|
||||
# rails caches. Pass a key to unlock and you'll get back a nonce if you
|
||||
# hold the lease. It deletes the lock, but only if the nonce that
|
||||
# is passed
|
||||
def unlock(key, nonce)
|
||||
raise ArgumentError("nonce can't be nil") unless nonce
|
||||
node = redis
|
||||
node = redis.node_for(key) if redis.is_a?(Redis::Distributed)
|
||||
delif_script.run(node, [key], [nonce])
|
||||
end
|
||||
|
||||
# redis does not have a built in "delete if", this lua script
|
||||
# is written to delete a key, but only if it's value matches the
|
||||
# provided value (so if someone else has re-written it since we won't delete it)
|
||||
def delif_script
|
||||
@_delif ||= Redis::Scripting::Script.new(File.expand_path("../delif.lua", __FILE__))
|
||||
end
|
||||
|
||||
# vanilla Rails is weird, and assumes "race_condition_ttl" is 5 minutes; override that to actually do math
|
||||
def write_entry(key, entry, unless_exist: false, raw: false, expires_in: nil, race_condition_ttl: nil, **options)
|
||||
if race_condition_ttl && expires_in
|
||||
expires_in += race_condition_ttl
|
||||
race_condition_ttl = nil
|
||||
end
|
||||
super
|
||||
end
|
||||
end
|
|
@ -28,7 +28,7 @@ module Canvas
|
|||
module FallbackExpirationCache
|
||||
KEY_SUFFIX = '__no_expire'.freeze
|
||||
|
||||
def fetch(*, expires_in: nil)
|
||||
def fetch(*, expires_in: nil, race_condition_ttl: nil)
|
||||
return yield if expires_in == 0
|
||||
super
|
||||
end
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
module Canvas
|
||||
module Cache
|
||||
class LocalRedisCache < ActiveSupport::Cache::RedisCacheStore
|
||||
include ActiveSupport::Cache::SafeRedisRaceCondition
|
||||
include FallbackExpirationCache
|
||||
|
||||
def initialize(local_cache_conf)
|
||||
|
|
|
@ -18,26 +18,35 @@
|
|||
require 'vault'
|
||||
|
||||
module Canvas::Vault
|
||||
CACHE_KEY_PREFIX = 'vault/'.freeze
|
||||
class MissingVaultProfile < StandardError; end
|
||||
|
||||
class << self
|
||||
CACHE_KEY_PREFIX = 'vault/'.freeze
|
||||
|
||||
def read(path)
|
||||
cached_val = LocalCache.fetch(CACHE_KEY_PREFIX + path)
|
||||
return cached_val unless cached_val.nil?
|
||||
|
||||
begin
|
||||
# we're going to override this anyway, just want it to use the fetch path.
|
||||
default_expiry = 30.minutes
|
||||
default_race_condition_ttl = Setting.get("vault_cache_race_condition_ttl", 60).to_i.seconds
|
||||
cache_key = CACHE_KEY_PREFIX + path
|
||||
fetched_lease_value = nil
|
||||
cached_data = LocalCache.fetch(cache_key, expires_in: default_expiry, race_condition_ttl: default_race_condition_ttl) do
|
||||
vault_resp = api_client.logical.read(path)
|
||||
return nil if vault_resp.nil?
|
||||
|
||||
token_ttl = vault_resp.lease_duration || vault_resp.data[:ttl] || 10.minutes
|
||||
cache_ttl = token_ttl / 2
|
||||
LocalCache.write(CACHE_KEY_PREFIX + path, vault_resp.data, expires_in: cache_ttl)
|
||||
|
||||
return vault_resp.data
|
||||
rescue => exception
|
||||
Canvas::Errors.capture_exception(:vault, exception)
|
||||
return LocalCache.fetch_without_expiration(CACHE_KEY_PREFIX + path)
|
||||
raise(MissingVaultProfile, "nil credentials found for #{path}") if vault_resp.nil?
|
||||
fetched_lease_value = vault_resp.lease_duration || vault_resp.data[:ttl] || 10.minutes
|
||||
vault_resp.data
|
||||
end
|
||||
unless fetched_lease_value.nil?
|
||||
# we actually talked to vault and got a new record, let's update the expiration information
|
||||
# so actually be sensitive to the data in the lease
|
||||
cache_ttl = fetched_lease_value / 2
|
||||
LocalCache.write(cache_key, cached_data, expires_in: cache_ttl)
|
||||
end
|
||||
return cached_data
|
||||
rescue => exception
|
||||
Canvas::Errors.capture_exception(:vault, exception)
|
||||
stale_value = LocalCache.fetch_without_expiration(CACHE_KEY_PREFIX + path)
|
||||
return stale_value if stale_value.present?
|
||||
# if we can't serve any stale value, we're better erroring than handing back nil
|
||||
raise
|
||||
end
|
||||
|
||||
def api_client
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#
|
||||
# You should have received a copy of the GNU Affero General Public License along
|
||||
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
require 'spec_helper'
|
||||
require_dependency "canvas/cache/local_redis_cache"
|
||||
|
||||
|
@ -45,6 +46,10 @@ module Canvas
|
|||
@fast_cache.clear
|
||||
end
|
||||
|
||||
def new_redis_client
|
||||
LocalRedisCache.new(redis_conf_hash)
|
||||
end
|
||||
|
||||
it "writes sets of keys atomically" do
|
||||
data_set = {
|
||||
"keya" => "vala",
|
||||
|
@ -74,6 +79,43 @@ module Canvas
|
|||
slow_thread.join
|
||||
expect(read_set == data_set).to be_truthy
|
||||
end
|
||||
|
||||
it "handles concurrent traffic" do
|
||||
lock_taken = false
|
||||
cache_key = "some-cache-key"
|
||||
cache_value = "THE VALUE"
|
||||
v1 = v2 = v3 = nil
|
||||
# using threads, but really testing against
|
||||
# many processes talking to the same redis
|
||||
t1 = Thread.new do
|
||||
c1 = new_redis_client
|
||||
v1 = c1.fetch(cache_key, expires_in: 30.seconds, race_condition_ttl: 10.seconds) do
|
||||
lock_taken = true
|
||||
sleep(0.2) # allow other threads to run and try for lock
|
||||
cache_value.dup
|
||||
end
|
||||
end
|
||||
t2 = Thread.new do
|
||||
c2 = new_redis_client
|
||||
sleep(0.1) until lock_taken # make sure t1 goes first
|
||||
v2 = c2.fetch(cache_key, expires_in: 30.seconds, race_condition_ttl: 10.seconds) do
|
||||
raise RuntimeError, "should have waited for t1"
|
||||
end
|
||||
end
|
||||
t3 = Thread.new do
|
||||
c3 = new_redis_client
|
||||
sleep(0.1) until lock_taken # make sure t1 goes first
|
||||
v3 = c3.fetch(cache_key, expires_in: 30.seconds, race_condition_ttl: 10.seconds) do
|
||||
raise RuntimeError, "should have waited for t1"
|
||||
end
|
||||
end
|
||||
t1.join
|
||||
t2.join
|
||||
t3.join
|
||||
expect(v1).to eq(cache_value)
|
||||
expect(v2).to eq(cache_value)
|
||||
expect(v3).to eq(cache_value)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -26,13 +26,13 @@ module Canvas
|
|||
let(:token_path){ '/path/to/token' }
|
||||
let(:addr) { 'http://vault:8200' }
|
||||
let(:addr_path) { '/path/to/addr' }
|
||||
let(:static_config) {
|
||||
let(:static_config) {
|
||||
{
|
||||
token: token,
|
||||
addr: addr,
|
||||
kv_mount: 'app-canvas'
|
||||
} }
|
||||
let(:path_config) {
|
||||
let(:path_config) {
|
||||
{
|
||||
token_path: token_path,
|
||||
addr_path: addr_path,
|
||||
|
@ -46,6 +46,7 @@ module Canvas
|
|||
end
|
||||
|
||||
after do
|
||||
LocalCache.clear
|
||||
WebMock.enable_net_connect!
|
||||
end
|
||||
|
||||
|
@ -72,55 +73,38 @@ module Canvas
|
|||
end
|
||||
|
||||
describe '.read' do
|
||||
it 'Caches the read' do
|
||||
before(:each) do
|
||||
allow(described_class).to receive(:config).and_return(static_config)
|
||||
stub = stub_request(:get, "#{addr}/v1/test/path").
|
||||
@stub = stub_request(:get, "#{addr}/v1/test/path").
|
||||
to_return(status: 200, body: {
|
||||
data: {
|
||||
foo: 'bar'
|
||||
},
|
||||
lease_duration: 3600,
|
||||
}.to_json, headers: { 'content-type': 'application/json' })
|
||||
|
||||
expect(described_class.read('test/path')).to eq({ foo: 'bar' })
|
||||
expect(stub).to have_been_requested.times(1)
|
||||
# uses the cache
|
||||
expect(described_class.read('test/path')).to eq({ foo: 'bar' })
|
||||
expect(stub).to have_been_requested.times(1)
|
||||
end
|
||||
|
||||
it 'Caches the read' do
|
||||
expect(described_class.read('test/path')).to eq({ foo: 'bar' })
|
||||
expect(@stub).to have_been_requested.times(1)
|
||||
# uses the cache
|
||||
expect(described_class.read('test/path')).to eq({ foo: 'bar' })
|
||||
expect(@stub).to have_been_requested.times(1)
|
||||
end
|
||||
|
||||
it 'Caches the read for less than the lease_duration' do
|
||||
allow(described_class).to receive(:config).and_return(static_config)
|
||||
stub = stub_request(:get, "#{addr}/v1/test/path").
|
||||
to_return(status: 200, body: {
|
||||
data: {
|
||||
foo: 'bar'
|
||||
},
|
||||
lease_duration: 3600,
|
||||
}.to_json, headers: { 'content-type': 'application/json' })
|
||||
|
||||
expect(described_class.read('test/path')).to eq({ foo: 'bar' })
|
||||
expect(stub).to have_been_requested.times(1)
|
||||
expect(@stub).to have_been_requested.times(1)
|
||||
# does not use the cache
|
||||
Timecop.travel(Time.zone.now + 3600.seconds) do
|
||||
expect(described_class.read('test/path')).to eq({ foo: 'bar' })
|
||||
expect(stub).to have_been_requested.times(2)
|
||||
expect(@stub).to have_been_requested.times(2)
|
||||
end
|
||||
end
|
||||
|
||||
it 'Uses the cache if vault is unavailible' do
|
||||
allow(described_class).to receive(:config).and_return(static_config)
|
||||
stub = stub_request(:get, "#{addr}/v1/test/path").
|
||||
to_return(status: 200, body: {
|
||||
data: {
|
||||
foo: 'bar'
|
||||
},
|
||||
lease_duration: 3600,
|
||||
}.to_json, headers: { 'content-type': 'application/json' })
|
||||
|
||||
expect(described_class.read('test/path')).to eq({ foo: 'bar' })
|
||||
expect(stub).to have_been_requested.times(1)
|
||||
expect(@stub).to have_been_requested.times(1)
|
||||
# restub to return an error now
|
||||
stub_request(:get, "#{addr}/v1/test/path").to_return(status: 500, body: 'error')
|
||||
Timecop.travel(Time.zone.now + 3600.seconds) do
|
||||
|
@ -142,6 +126,53 @@ module Canvas
|
|||
result = described_class.read(cred_path)
|
||||
expect(result[:security_token]).to eq("fake-security-token")
|
||||
end
|
||||
|
||||
describe 'locking and loading' do
|
||||
let(:credential_path){ 'test/vault/creds/path' }
|
||||
let(:lease_duration){ 3600 }
|
||||
let(:credential_data){ { credential_id: 'aabbccdd', credential_secret: 'pampelmousse' } }
|
||||
before(:each) do
|
||||
skip("Must have a local redis available to run this spec") unless Canvas.redis_enabled?
|
||||
allow(ConfigFile).to receive(:load).with("local_cache").and_return({
|
||||
store: "redis",
|
||||
redis_host: "redis",
|
||||
redis_port: 6379,
|
||||
redis_db: 6 # intentionally one probably not used elsewhere
|
||||
})
|
||||
@lock_stub = stub_request(:get, "#{addr}/v1/#{credential_path}").
|
||||
to_return(status: 200, body: {
|
||||
data: credential_data,
|
||||
lease_duration: lease_duration,
|
||||
}.to_json, headers: { 'content-type': 'application/json' })
|
||||
end
|
||||
|
||||
it "will queue if the lock is taken and there is no value in the cache" do
|
||||
expect(LocalCache.fetch(::Canvas::Vault::CACHE_KEY_PREFIX + credential_path)).to be_nil
|
||||
t1_val = t2_val = t3_val = t4_val = nil
|
||||
threads = [
|
||||
Thread.new { t1_val = described_class.read(credential_path) },
|
||||
Thread.new { t2_val = described_class.read(credential_path) },
|
||||
Thread.new { t3_val = described_class.read(credential_path) },
|
||||
Thread.new { t4_val = described_class.read(credential_path) }
|
||||
]
|
||||
threads.each{|t| t.join }
|
||||
expect(t1_val).to eq(credential_data)
|
||||
expect(t2_val).to eq(credential_data)
|
||||
expect(t3_val).to eq(credential_data)
|
||||
expect(t4_val).to eq(credential_data)
|
||||
expect(@lock_stub).to have_been_requested.times(1)
|
||||
end
|
||||
|
||||
it "respects the lease duration for expiration" do
|
||||
cache_key = ::Canvas::Vault::CACHE_KEY_PREFIX + credential_path
|
||||
expect(LocalCache.fetch(cache_key)).to be_nil
|
||||
result = described_class.read(credential_path)
|
||||
cache_entry = LocalCache.cache.send(:read_entry, cache_key, {})
|
||||
expiry_approximate = Time.now.utc.to_i + (lease_duration / 2)
|
||||
expiry_delta = (cache_entry.expires_at - expiry_approximate).abs
|
||||
expect(expiry_delta.abs < 30).to be_truthy
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -43,6 +43,11 @@ describe LocalCache do
|
|||
skip("Must have a local redis available to run this spec") unless Canvas.redis_enabled?
|
||||
allow(ConfigFile).to receive(:load).with("local_cache").and_return(redis_conf_hash)
|
||||
LocalCache.reset
|
||||
LocalCache.clear
|
||||
end
|
||||
|
||||
after(:each) do
|
||||
LocalCache.clear
|
||||
end
|
||||
|
||||
it "uses a redis store" do
|
||||
|
|
Loading…
Reference in New Issue