mirror of https://github.com/rails/rails
Merge pull request #50938 from Shopify/refactor-query-cache-to-pool
Refactor QueryCache to be owned by the pool
This commit is contained in:
commit
13c1dfe4e0
|
@ -113,7 +113,7 @@ module ActiveRecord
|
|||
# are now explicitly documented
|
||||
class ConnectionPool
|
||||
include MonitorMixin
|
||||
include QueryCache::ConnectionPoolConfiguration
|
||||
prepend QueryCache::ConnectionPoolConfiguration
|
||||
include ConnectionAdapters::AbstractPool
|
||||
|
||||
attr_accessor :automatic_reconnect, :checkout_timeout
|
||||
|
@ -176,13 +176,13 @@ module ActiveRecord
|
|||
# #connection can be called any number of times; the connection is
|
||||
# held in a cache keyed by a thread.
|
||||
def connection
|
||||
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout
|
||||
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] ||= checkout
|
||||
end
|
||||
|
||||
def pin_connection!(lock_thread) # :nodoc:
|
||||
raise "There is already a pinned connection" if @pinned_connection
|
||||
|
||||
@pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout)
|
||||
@pinned_connection = (@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context] || checkout)
|
||||
# Any leased connection must be in @connections otherwise
|
||||
# some methods like #connected? won't behave correctly
|
||||
unless @connections.include?(@pinned_connection)
|
||||
|
@ -226,7 +226,7 @@ module ActiveRecord
|
|||
# #connection or #with_connection methods. Connections obtained through
|
||||
# #checkout will not be detected by #active_connection?
|
||||
def active_connection?
|
||||
@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
|
||||
@thread_cached_conns[ActiveSupport::IsolatedExecutionState.context]
|
||||
end
|
||||
|
||||
# Signal that the thread is finished with the current connection.
|
||||
|
@ -237,7 +237,7 @@ module ActiveRecord
|
|||
# #connection or #with_connection methods, connections obtained through
|
||||
# #checkout will not be automatically released.
|
||||
def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context)
|
||||
if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread))
|
||||
if conn = @thread_cached_conns.delete(owner_thread)
|
||||
checkin conn
|
||||
end
|
||||
end
|
||||
|
@ -252,7 +252,7 @@ module ActiveRecord
|
|||
# connection will be properly returned to the pool by the code that checked
|
||||
# it out.
|
||||
def with_connection
|
||||
unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
|
||||
unless conn = @thread_cached_conns[ActiveSupport::IsolatedExecutionState.context]
|
||||
conn = connection
|
||||
fresh_connection = true
|
||||
end
|
||||
|
@ -471,6 +471,8 @@ module ActiveRecord
|
|||
remove conn
|
||||
end
|
||||
end
|
||||
|
||||
prune_thread_cache
|
||||
end
|
||||
|
||||
# Disconnect all connections that have been idle for at least
|
||||
|
@ -558,15 +560,6 @@ module ActiveRecord
|
|||
end
|
||||
end
|
||||
|
||||
#--
|
||||
# From the discussion on GitHub:
|
||||
# https://github.com/rails/rails/pull/14938#commitcomment-6601951
|
||||
# This hook-in method allows for easier monkey-patching fixes needed by
|
||||
# JRuby users that use Fibers.
|
||||
def connection_cache_key(thread)
|
||||
thread
|
||||
end
|
||||
|
||||
# Take control of all existing connections so a "group" action such as
|
||||
# reload/disconnect can be performed safely. It is no longer enough to
|
||||
# wrap it in +synchronize+ because some pool's actions are allowed
|
||||
|
@ -710,10 +703,17 @@ module ActiveRecord
|
|||
#--
|
||||
# if owner_thread param is omitted, this must be called in synchronize block
|
||||
def remove_connection_from_thread_cache(conn, owner_thread = conn.owner)
|
||||
@thread_cached_conns.delete_pair(connection_cache_key(owner_thread), conn)
|
||||
@thread_cached_conns.delete_pair(owner_thread, conn)
|
||||
end
|
||||
alias_method :release, :remove_connection_from_thread_cache
|
||||
|
||||
def prune_thread_cache
|
||||
dead_threads = @thread_cached_conns.keys.reject(&:alive?)
|
||||
dead_threads.each do |dead_thread|
|
||||
@thread_cached_conns.delete(dead_thread)
|
||||
end
|
||||
end
|
||||
|
||||
def new_connection
|
||||
connection = db_config.new_connection
|
||||
connection.pool = self
|
||||
|
|
|
@ -13,8 +13,7 @@ module ActiveRecord
|
|||
:truncate_tables, :rollback_to_savepoint, :rollback_db_transaction, :restart_db_transaction,
|
||||
:exec_insert_all
|
||||
|
||||
base.set_callback :checkout, :after, :configure_query_cache!
|
||||
base.set_callback :checkin, :after, :disable_query_cache!
|
||||
base.set_callback :checkin, :after, :unset_query_cache!
|
||||
end
|
||||
|
||||
def dirties_query_cache(base, *method_names)
|
||||
|
@ -29,60 +28,153 @@ module ActiveRecord
|
|||
end
|
||||
end
|
||||
|
||||
module ConnectionPoolConfiguration
|
||||
def initialize(*)
|
||||
class Store # :nodoc:
|
||||
attr_reader :enabled
|
||||
alias_method :enabled?, :enabled
|
||||
|
||||
def initialize(max_size)
|
||||
@map = {}
|
||||
@max_size = max_size
|
||||
@enabled = false
|
||||
end
|
||||
|
||||
def enabled=(enabled)
|
||||
clear if @enabled && !enabled
|
||||
@enabled = enabled
|
||||
end
|
||||
|
||||
def size
|
||||
@map.size
|
||||
end
|
||||
|
||||
def empty?
|
||||
@map.empty?
|
||||
end
|
||||
|
||||
def [](key)
|
||||
return unless @enabled
|
||||
|
||||
if entry = @map.delete(key)
|
||||
@map[key] = entry
|
||||
end
|
||||
end
|
||||
|
||||
def compute_if_absent(key)
|
||||
return yield unless @enabled
|
||||
|
||||
if entry = @map.delete(key)
|
||||
return @map[key] = entry
|
||||
end
|
||||
|
||||
if @max_size && @map.size >= @max_size
|
||||
@map.shift # evict the oldest entry
|
||||
end
|
||||
|
||||
@map[key] ||= yield
|
||||
end
|
||||
|
||||
def clear
|
||||
@map.clear
|
||||
self
|
||||
end
|
||||
end
|
||||
|
||||
module ConnectionPoolConfiguration # :nodoc:
|
||||
def initialize(...)
|
||||
super
|
||||
@query_cache_enabled = Concurrent::Map.new { false }
|
||||
@thread_query_caches = Concurrent::Map.new(initial_capacity: @size)
|
||||
@query_cache_max_size = \
|
||||
case query_cache = db_config&.query_cache
|
||||
when 0, false
|
||||
nil
|
||||
when Integer
|
||||
query_cache
|
||||
when nil
|
||||
DEFAULT_SIZE
|
||||
end
|
||||
end
|
||||
|
||||
def checkout(...)
|
||||
connection = super
|
||||
connection.query_cache ||= query_cache
|
||||
connection
|
||||
end
|
||||
|
||||
# Disable the query cache within the block.
|
||||
def disable_query_cache
|
||||
cache = query_cache
|
||||
old, cache.enabled = cache.enabled, false
|
||||
begin
|
||||
yield
|
||||
ensure
|
||||
cache.enabled = old
|
||||
end
|
||||
end
|
||||
|
||||
def enable_query_cache
|
||||
cache = query_cache
|
||||
old, cache.enabled = cache.enabled, true
|
||||
begin
|
||||
yield
|
||||
ensure
|
||||
cache.enabled = old
|
||||
end
|
||||
end
|
||||
|
||||
def enable_query_cache!
|
||||
@query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] = true
|
||||
connection.enable_query_cache! if active_connection?
|
||||
query_cache.enabled = true
|
||||
end
|
||||
|
||||
def disable_query_cache!
|
||||
@query_cache_enabled.delete connection_cache_key(ActiveSupport::IsolatedExecutionState.context)
|
||||
connection.disable_query_cache! if active_connection?
|
||||
query_cache.enabled = false
|
||||
end
|
||||
|
||||
def query_cache_enabled
|
||||
@query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)]
|
||||
query_cache.enabled
|
||||
end
|
||||
|
||||
private
|
||||
def prune_thread_cache
|
||||
dead_threads = @thread_query_caches.keys.reject(&:alive?)
|
||||
dead_threads.each do |dead_thread|
|
||||
@thread_query_caches.delete(dead_thread)
|
||||
end
|
||||
end
|
||||
|
||||
def query_cache
|
||||
@thread_query_caches.compute_if_absent(ActiveSupport::IsolatedExecutionState.context) do
|
||||
Store.new(@query_cache_max_size)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
attr_reader :query_cache, :query_cache_enabled
|
||||
attr_accessor :query_cache
|
||||
|
||||
def initialize(*)
|
||||
super
|
||||
@query_cache = {}
|
||||
@query_cache_enabled = false
|
||||
@query_cache_max_size = nil
|
||||
@query_cache = nil
|
||||
end
|
||||
|
||||
def query_cache_enabled
|
||||
@query_cache&.enabled?
|
||||
end
|
||||
|
||||
# Enable the query cache within the block.
|
||||
def cache
|
||||
old, @query_cache_enabled = @query_cache_enabled, true
|
||||
yield
|
||||
ensure
|
||||
@query_cache_enabled = old
|
||||
clear_query_cache unless @query_cache_enabled
|
||||
def cache(&)
|
||||
pool.enable_query_cache(&)
|
||||
end
|
||||
|
||||
def enable_query_cache!
|
||||
@query_cache_enabled = true
|
||||
end
|
||||
|
||||
def disable_query_cache!
|
||||
@query_cache_enabled = false
|
||||
clear_query_cache
|
||||
pool.enable_query_cache!
|
||||
end
|
||||
|
||||
# Disable the query cache within the block.
|
||||
def uncached
|
||||
old, @query_cache_enabled = @query_cache_enabled, false
|
||||
yield
|
||||
ensure
|
||||
@query_cache_enabled = old
|
||||
def uncached(&)
|
||||
pool.disable_query_cache(&)
|
||||
end
|
||||
|
||||
def disable_query_cache!
|
||||
pool.disable_query_cache!
|
||||
end
|
||||
|
||||
# Clears the query cache.
|
||||
|
@ -93,7 +185,7 @@ module ActiveRecord
|
|||
# undermining the randomness you were expecting.
|
||||
def clear_query_cache
|
||||
@lock.synchronize do
|
||||
@query_cache.clear
|
||||
@query_cache&.clear
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -102,7 +194,7 @@ module ActiveRecord
|
|||
|
||||
# If arel is locked this is a SELECT ... FOR UPDATE or somesuch.
|
||||
# Such queries should not be cached.
|
||||
if @query_cache_enabled && !(arel.respond_to?(:locked) && arel.locked)
|
||||
if @query_cache&.enabled? && !(arel.respond_to?(:locked) && arel.locked)
|
||||
sql, binds, preparable = to_sql_and_binds(arel, binds, preparable)
|
||||
|
||||
if async
|
||||
|
@ -117,42 +209,37 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
private
|
||||
def unset_query_cache!
|
||||
@query_cache = nil
|
||||
end
|
||||
|
||||
def lookup_sql_cache(sql, name, binds)
|
||||
key = binds.empty? ? sql : [sql, binds]
|
||||
hit = false
|
||||
result = nil
|
||||
|
||||
result = nil
|
||||
@lock.synchronize do
|
||||
if (result = @query_cache.delete(key))
|
||||
hit = true
|
||||
@query_cache[key] = result
|
||||
end
|
||||
result = @query_cache[key]
|
||||
end
|
||||
|
||||
if hit
|
||||
if result
|
||||
ActiveSupport::Notifications.instrument(
|
||||
"sql.active_record",
|
||||
cache_notification_info(sql, name, binds)
|
||||
)
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
def cache_sql(sql, name, binds)
|
||||
key = binds.empty? ? sql : [sql, binds]
|
||||
result = nil
|
||||
hit = false
|
||||
hit = true
|
||||
|
||||
@lock.synchronize do
|
||||
if (result = @query_cache.delete(key))
|
||||
hit = true
|
||||
@query_cache[key] = result
|
||||
else
|
||||
result = @query_cache[key] = yield
|
||||
if @query_cache_max_size && @query_cache.size > @query_cache_max_size
|
||||
@query_cache.shift
|
||||
end
|
||||
result = @query_cache.compute_if_absent(key) do
|
||||
hit = false
|
||||
yield
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -178,23 +265,6 @@ module ActiveRecord
|
|||
cached: true
|
||||
}
|
||||
end
|
||||
|
||||
def configure_query_cache!
|
||||
case query_cache = pool.db_config.query_cache
|
||||
when 0, false
|
||||
return
|
||||
when Integer
|
||||
@query_cache_max_size = query_cache
|
||||
when nil
|
||||
@query_cache_max_size = DEFAULT_SIZE
|
||||
else
|
||||
@query_cache_max_size = nil # no limit
|
||||
end
|
||||
|
||||
if pool.query_cache_enabled
|
||||
enable_query_cache!
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,7 +8,7 @@ module ActiveRecord
|
|||
# If it's not, it will execute the given block.
|
||||
def cache(&block)
|
||||
if connected? || !configurations.empty?
|
||||
connection.cache(&block)
|
||||
connection_pool.enable_query_cache(&block)
|
||||
else
|
||||
yield
|
||||
end
|
||||
|
@ -18,7 +18,7 @@ module ActiveRecord
|
|||
# If it's not, it will execute the given block.
|
||||
def uncached(&block)
|
||||
if connected? || !configurations.empty?
|
||||
connection.uncached(&block)
|
||||
connection_pool.disable_query_cache(&block)
|
||||
else
|
||||
yield
|
||||
end
|
||||
|
@ -26,11 +26,11 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
def self.run
|
||||
ActiveRecord::Base.connection_handler.each_connection_pool.reject { |p| p.query_cache_enabled }.each { |p| p.enable_query_cache! }
|
||||
ActiveRecord::Base.connection_handler.each_connection_pool.reject(&:query_cache_enabled).each(&:enable_query_cache!)
|
||||
end
|
||||
|
||||
def self.complete(pools)
|
||||
pools.each { |pool| pool.disable_query_cache! }
|
||||
pools.each(&:disable_query_cache!)
|
||||
|
||||
ActiveRecord::Base.connection_handler.each_connection_pool do |pool|
|
||||
pool.release_connection if pool.active_connection? && !pool.connection.transaction_open?
|
||||
|
|
|
@ -808,7 +808,7 @@ class QueryCacheExpiryTest < ActiveRecord::TestCase
|
|||
end
|
||||
|
||||
def test_find
|
||||
assert_called(Task.connection.query_cache, :clear) do
|
||||
assert_called(Task.connection.query_cache, :clear, times: 2) do
|
||||
assert_not Task.connection.query_cache_enabled
|
||||
Task.cache do
|
||||
assert Task.connection.query_cache_enabled
|
||||
|
@ -922,9 +922,12 @@ class QueryCacheExpiryTest < ActiveRecord::TestCase
|
|||
end
|
||||
|
||||
def test_query_cache_lru_eviction
|
||||
store = ActiveRecord::ConnectionAdapters::QueryCache::Store.new(2)
|
||||
store.enabled = true
|
||||
|
||||
connection = Post.connection
|
||||
connection.pool.db_config.stub(:query_cache, 2) do
|
||||
connection.send(:configure_query_cache!)
|
||||
old_store, connection.query_cache = connection.query_cache, store
|
||||
begin
|
||||
Post.cache do
|
||||
assert_queries_count(2) do
|
||||
connection.select_all("SELECT 1")
|
||||
|
@ -945,9 +948,9 @@ class QueryCacheExpiryTest < ActiveRecord::TestCase
|
|||
connection.select_all("SELECT 2")
|
||||
end
|
||||
end
|
||||
ensure
|
||||
connection.query_cache = old_store
|
||||
end
|
||||
ensure
|
||||
connection.send(:configure_query_cache!)
|
||||
end
|
||||
|
||||
test "threads use the same connection" do
|
||||
|
|
Loading…
Reference in New Issue