Merge pull request #41495 from eileencodes/make-executor-configurable

Allow async executor to be configurable
This commit is contained in:
Eileen M. Uchitelle 2021-02-19 14:03:34 -05:00 committed by GitHub
commit 9437f6da6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 394 additions and 122 deletions

View File

@ -1,3 +1,17 @@
* Allow applications to configure the thread pool for async queries
Some applications may want one thread pool per database whereas others want to use
a single global thread pool for all queries. By default Rails will set `async_query_executor`
to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op.
To create one thread pool for all database connections to use applications can set
`config.active_record.async_query_executor` to `:global_thread_pool` and optionally define
`config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want
to have a thread pool for each database connection, `config.active_record.async_query_executor` can
be set to `:multi_thread_pool`. The configuration for each thread pool is set in the database
configuration.
*Eileen M. Uchitelle*
* Allow new syntax for `enum` to avoid leading `_` from reserved options.
Before:

View File

@ -144,12 +144,7 @@ module ActiveRecord
@lock_thread = false
@async_executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: @size,
max_queue: @size * 4,
fallback_policy: :caller_runs
)
@async_executor = build_async_executor
@reaper = Reaper.new(self, db_config.reaping_frequency)
@reaper.run
@ -463,6 +458,22 @@ module ActiveRecord
end
private
def build_async_executor
case Base.async_query_executor
when :multi_thread_pool
Concurrent::ThreadPoolExecutor.new(
min_threads: @db_config.min_threads,
max_threads: @db_config.max_threads,
max_queue: @db_config.max_queue,
fallback_policy: :caller_runs
)
when :global_thread_pool
Base.global_thread_pool_async_query_executor
else
Base.immediate_query_executor
end
end
#--
# this is unfortunately not concurrent
def bulk_make_new_connections(num_new_conns_needed)

View File

@ -157,6 +157,44 @@ module ActiveRecord
mattr_accessor :application_record_class, instance_accessor: false, default: nil
# Sets the async_query_executor for an application. By default the thread pool executor
# set to `:immediate. Options are:
#
# * :immediate - Initializes a single +Concurrent::ImmediateExecutor+
# * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+
# that uses the +async_query_concurrency+ for the +max_threads+ value.
# * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each
# database connection. The initializer values are defined in the configuration hash.
mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate
def self.immediate_query_executor # :nodoc:
@@immediate_query_executor ||= Concurrent::ImmediateExecutor.new
end
def self.global_thread_pool_async_query_executor # :nodoc:
concurrency = global_executor_concurrency || 4
@@global_thread_pool_async_query_executor ||= Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: concurrency,
max_queue: concurrency * 4,
fallback_policy: :caller_runs
)
end
# Set the +global_executor_concurrency+. This configuration value can only be used
# with the global thread pool async query executor.
def self.global_executor_concurrency=(global_executor_concurrency)
if async_query_executor == :immediate || async_query_executor == :multi_thread_pool
raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op."
end
@@global_executor_concurrency = global_executor_concurrency
end
def self.global_executor_concurrency # :nodoc:
@@global_executor_concurrency ||= nil
end
def self.application_record_class? # :nodoc:
if Base.application_record_class
self == Base.application_record_class

View File

@ -48,6 +48,18 @@ module ActiveRecord
raise NotImplementedError
end
def min_threads
raise NotImplementedError
end
def max_threads
raise NotImplementedError
end
def max_queue
raise NotImplementedError
end
def checkout_timeout
raise NotImplementedError
end

View File

@ -66,6 +66,18 @@ module ActiveRecord
(configuration_hash[:pool] || 5).to_i
end
def min_threads
(configuration_hash[:min_threads] || 0).to_i
end
def max_threads
(configuration_hash[:max_threads] || pool).to_i
end
def max_queue
max_threads * 4
end
def checkout_timeout
(configuration_hash[:checkout_timeout] || 5).to_f
end

View File

@ -343,122 +343,6 @@ module ActiveRecord
end
end
module AsynchronousQueriesSharedTests
def test_async_select_failure
ActiveRecord::Base.asynchronous_queries_tracker.start_session
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_from_transaction
ActiveRecord::Base.asynchronous_queries_tracker.start_session
assert_nothing_raised do
@connection.select_all "SELECT * FROM posts", async: true
end
@connection.transaction do
assert_raises AsynchronousQueryInsideTransactionError do
@connection.select_all "SELECT * FROM posts", async: true
end
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_cache
ActiveRecord::Base.asynchronous_queries_tracker.start_session
@connection.enable_query_cache!
@connection.select_all "SELECT * FROM posts"
result = @connection.select_all "SELECT * FROM posts", async: true
assert_equal Result, result.class
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
@connection.disable_query_cache!
end
def test_async_query_foreground_fallback
status = {}
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM does_not_exists"
status[:executed] = true
status[:async] = event.payload[:async]
end
end
@connection.pool.stub(:schedule_query, proc { }) do
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
end
assert_equal true, status[:executed]
assert_equal false, status[:async]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesTest < ActiveRecord::TestCase
self.use_transactional_tests = false
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
end
def test_async_select_all
ActiveRecord::Base.asynchronous_queries_tracker.start_session
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM posts"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
future_result = @connection.select_all "SELECT * FROM posts", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_kind_of ActiveRecord::Result, future_result.result
assert_equal @connection.supports_concurrent_connections?, status[:async]
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
self.use_transactional_tests = true
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
@connection.materialize_transactions
end
end
class AdapterForeignKeyTest < ActiveRecord::TestCase
self.use_transactional_tests = false

View File

@ -0,0 +1,267 @@
# frozen_string_literal: true
require "cases/helper"
require "support/connection_helper"
require "models/post"
module AsynchronousQueriesSharedTests
def test_async_select_failure
ActiveRecord::Base.asynchronous_queries_tracker.start_session
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_from_transaction
ActiveRecord::Base.asynchronous_queries_tracker.start_session
assert_nothing_raised do
@connection.select_all "SELECT * FROM posts", async: true
end
@connection.transaction do
assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do
@connection.select_all "SELECT * FROM posts", async: true
end
end
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
end
def test_async_query_cache
ActiveRecord::Base.asynchronous_queries_tracker.start_session
@connection.enable_query_cache!
@connection.select_all "SELECT * FROM posts"
result = @connection.select_all "SELECT * FROM posts", async: true
assert_equal ActiveRecord::Result, result.class
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
@connection.disable_query_cache!
end
def test_async_query_foreground_fallback
status = {}
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM does_not_exists"
status[:executed] = true
status[:async] = event.payload[:async]
end
end
@connection.pool.stub(:schedule_query, proc { }) do
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
assert_raises ActiveRecord::StatementInvalid do
future_result.result
end
end
assert_equal true, status[:executed]
assert_equal false, status[:async]
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesTest < ActiveRecord::TestCase
self.use_transactional_tests = false
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
end
def test_async_select_all
ActiveRecord::Base.asynchronous_queries_tracker.start_session
status = {}
monitor = Monitor.new
condition = monitor.new_cond
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
if event.payload[:sql] == "SELECT * FROM posts"
status[:executed] = true
status[:async] = event.payload[:async]
monitor.synchronize { condition.signal }
end
end
future_result = @connection.select_all "SELECT * FROM posts", async: true
assert_kind_of ActiveRecord::FutureResult, future_result
monitor.synchronize do
condition.wait_until { status[:executed] }
end
assert_kind_of ActiveRecord::Result, future_result.result
assert_equal @connection.supports_concurrent_connections?, status[:async]
ensure
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
end
class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase
self.use_transactional_tests = true
include AsynchronousQueriesSharedTests
def setup
@connection = ActiveRecord::Base.connection
@connection.materialize_transactions
end
end
class AsynchronousExecutorTypeTest < ActiveRecord::TestCase
def test_immediate_configuration_uses_a_single_immediate_executor_by_default
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :immediate
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)
async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)
assert async_pool1.is_a?(Concurrent::ImmediateExecutor)
assert async_pool2.is_a?(Concurrent::ImmediateExecutor)
assert_equal 2, handler.all_connection_pools.count
assert_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord::Base.async_query_executor = old_value
end
def test_one_global_thread_pool_is_used_when_set_with_default_concurrency
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :global_thread_pool
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)
async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)
assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)
assert 0, async_pool1.min_length
assert 4, async_pool1.max_length
assert 16, async_pool1.max_queue
assert :caller_runs, async_pool1.fallback_policy
assert 0, async_pool2.min_length
assert 4, async_pool2.max_length
assert 16, async_pool2.max_queue
assert :caller_runs, async_pool2.fallback_policy
assert_equal 2, handler.all_connection_pools.count
assert_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord::Base.async_query_executor = old_value
end
def test_concurrency_can_be_set_on_global_thread_pool
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :global_thread_pool
old_concurrency = ActiveRecord::Base.global_executor_concurrency
ActiveRecord::Base.global_executor_concurrency = 8
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)
async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)
assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)
assert 0, async_pool1.min_length
assert 8, async_pool1.max_length
assert 32, async_pool1.max_queue
assert :caller_runs, async_pool1.fallback_policy
assert 0, async_pool2.min_length
assert 8, async_pool2.max_length
assert 32, async_pool2.max_queue
assert :caller_runs, async_pool2.fallback_policy
assert_equal 2, handler.all_connection_pools.count
assert_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord::Base.global_executor_concurrency = old_concurrency
ActiveRecord::Base.async_query_executor = old_value
end
def test_concurrency_cannot_be_set_with_immediate_or_multi_thread_pool
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :immediate
assert_raises ArgumentError do
ActiveRecord::Base.global_executor_concurrency = 8
end
ActiveRecord::Base.async_query_executor = :multi_thread_pool
assert_raises ArgumentError do
ActiveRecord::Base.global_executor_concurrency = 8
end
ensure
ActiveRecord::Base.async_query_executor = old_value
end
def test_one_global_thread_pool_uses_concurrency_if_set
old_value = ActiveRecord::Base.async_query_executor
ActiveRecord::Base.async_query_executor = :multi_thread_pool
handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
config_hash = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary").configuration_hash
new_config_hash = config_hash.merge(min_threads: 0, max_threads: 10)
db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new("arunit", "primary", new_config_hash)
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)
async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)
assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)
assert 0, async_pool1.min_length
assert 10, async_pool1.max_length
assert 40, async_pool1.max_queue
assert :caller_runs, async_pool1.fallback_policy
assert 0, async_pool2.min_length
assert 4, async_pool2.max_length
assert 16, async_pool2.max_queue
assert :caller_runs, async_pool2.fallback_policy
assert_equal 2, handler.all_connection_pools.count
assert_not_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord::Base.async_query_executor = old_value
end
end

View File

@ -20,6 +20,39 @@ module ActiveRecord
assert_equal 5, config.pool
end
def test_min_threads_with_value
config = HashConfig.new("default_env", "primary", min_threads: "1")
assert_equal 1, config.min_threads
end
def test_min_threads_default
config = HashConfig.new("default_env", "primary", {})
assert_equal 0, config.min_threads
end
def test_max_threads_with_value
config = HashConfig.new("default_env", "primary", max_threads: "10")
assert_equal 10, config.max_threads
end
def test_max_threads_default_uses_pool_default
config = HashConfig.new("default_env", "primary", {})
assert_equal 5, config.pool
assert_equal 5, config.max_threads
end
def test_max_threads_uses_pool_when_set
config = HashConfig.new("default_env", "primary", pool: 1)
assert_equal 1, config.pool
assert_equal 1, config.max_threads
end
def test_max_queue_is_pool_multipled_by_4
config = HashConfig.new("default_env", "primary", {})
assert_equal 5, config.max_threads
assert_equal config.max_threads * 4, config.max_queue
end
def test_checkout_timeout_default_when_nil
config = HashConfig.new("default_env", "primary", checkout_timeout: nil)
assert_equal 5.0, config.checkout_timeout

View File

@ -20,6 +20,7 @@ module ARTest
def self.connect
ActiveRecord::Base.legacy_connection_handling = false
ActiveRecord::Base.async_query_executor = :global_thread_pool
puts "Using #{connection_name}"
ActiveRecord::Base.logger = ActiveSupport::Logger.new("debug.log", 0, 100 * 1024 * 1024)
ActiveRecord::Base.configurations = test_configuration_hashes