From d9639a211fd358c8aa0b838e88b9840231a31b88 Mon Sep 17 00:00:00 2001 From: eileencodes Date: Thu, 18 Feb 2021 13:10:57 -0500 Subject: [PATCH] Allow async executor to be configurable This is a followup/alternative to #41406. This change wouldn't work for GitHub because we intend to implement an executor for each database and use the database configuration to set the `min_threads` and `max_threads` for each one. The changes here borrow from #41406 by implementing an `Concurrent::ImmediateExecutor` by default. Otherwise applications have the option of having one global thread pool that is used by all connections or a thread pool for each connection. A global thread pool can set with `config.active_record.async_query_executor = :global_thread_pool`. This will create a single `Concurrent::ThreadPoolExecutor` for applications to utilize. By default the concurrency is 4, but it can be changed for the `global_thread_pool` by setting `global_executor_concurrency` to another number. If applications want to use a thread pool per database connection they can set `config.active_record.async_query_executor = :multi_thread_pool`. This will create a `Concurrent::ThreadPoolExecutor` for each database connection and set the `min_threads` and `max_threads` by their configuration values or the defaults. I've also moved the async tests out of the adapter test and into their own tests and added tests for all the new functionality. This change would allow us at GitHub to control threads per database and per writer/reader or other apps to use one global executor. The immediate executor allows apps to no-op by default. Took the immediate executor idea from Jean's PR. Co-authored-by: Jean Boussier --- activerecord/CHANGELOG.md | 14 + .../abstract/connection_pool.rb | 23 +- activerecord/lib/active_record/core.rb | 38 +++ .../database_config.rb | 12 + .../database_configurations/hash_config.rb | 12 + activerecord/test/cases/adapter_test.rb | 116 -------- .../test/cases/asynchronous_queries_test.rb | 267 ++++++++++++++++++ .../hash_config_test.rb | 33 +++ activerecord/test/support/connection.rb | 1 + 9 files changed, 394 insertions(+), 122 deletions(-) create mode 100644 activerecord/test/cases/asynchronous_queries_test.rb diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 5fbb20c7cb0..d2b06dea0e5 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,3 +1,17 @@ +* Allow applications to configure the thread pool for async queries + + Some applications may want one thread pool per database whereas others want to use + a single global thread pool for all queries. By default Rails will set `async_query_executor` + to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op. + To create one thread pool for all database connections to use applications can set + `config.active_record.async_query_executor` to `:global_thread_pool` and optionally define + `config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want + to have a thread pool for each database connection, `config.active_record.async_query_executor` can + be set to `:multi_thread_pool`. The configuration for each thread pool is set in the database + configuration. + + *Eileen M. Uchitelle* + * Allow new syntax for `enum` to avoid leading `_` from reserved options. Before: diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 5670790844f..6582a6df545 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -144,12 +144,7 @@ module ActiveRecord @lock_thread = false - @async_executor = Concurrent::ThreadPoolExecutor.new( - min_threads: 0, - max_threads: @size, - max_queue: @size * 4, - fallback_policy: :caller_runs - ) + @async_executor = build_async_executor @reaper = Reaper.new(self, db_config.reaping_frequency) @reaper.run @@ -463,6 +458,22 @@ module ActiveRecord end private + def build_async_executor + case Base.async_query_executor + when :multi_thread_pool + Concurrent::ThreadPoolExecutor.new( + min_threads: @db_config.min_threads, + max_threads: @db_config.max_threads, + max_queue: @db_config.max_queue, + fallback_policy: :caller_runs + ) + when :global_thread_pool + Base.global_thread_pool_async_query_executor + else + Base.immediate_query_executor + end + end + #-- # this is unfortunately not concurrent def bulk_make_new_connections(num_new_conns_needed) diff --git a/activerecord/lib/active_record/core.rb b/activerecord/lib/active_record/core.rb index 3713da970f1..643b13670ba 100644 --- a/activerecord/lib/active_record/core.rb +++ b/activerecord/lib/active_record/core.rb @@ -157,6 +157,44 @@ module ActiveRecord mattr_accessor :application_record_class, instance_accessor: false, default: nil + # Sets the async_query_executor for an application. By default the thread pool executor + # set to `:immediate. Options are: + # + # * :immediate - Initializes a single +Concurrent::ImmediateExecutor+ + # * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+ + # that uses the +async_query_concurrency+ for the +max_threads+ value. + # * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each + # database connection. The initializer values are defined in the configuration hash. + mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate + + def self.immediate_query_executor # :nodoc: + @@immediate_query_executor ||= Concurrent::ImmediateExecutor.new + end + + def self.global_thread_pool_async_query_executor # :nodoc: + concurrency = global_executor_concurrency || 4 + @@global_thread_pool_async_query_executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 0, + max_threads: concurrency, + max_queue: concurrency * 4, + fallback_policy: :caller_runs + ) + end + + # Set the +global_executor_concurrency+. This configuration value can only be used + # with the global thread pool async query executor. + def self.global_executor_concurrency=(global_executor_concurrency) + if async_query_executor == :immediate || async_query_executor == :multi_thread_pool + raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op." + end + + @@global_executor_concurrency = global_executor_concurrency + end + + def self.global_executor_concurrency # :nodoc: + @@global_executor_concurrency ||= nil + end + def self.application_record_class? # :nodoc: if Base.application_record_class self == Base.application_record_class diff --git a/activerecord/lib/active_record/database_configurations/database_config.rb b/activerecord/lib/active_record/database_configurations/database_config.rb index 6ad9b3922e7..aeb22993f88 100644 --- a/activerecord/lib/active_record/database_configurations/database_config.rb +++ b/activerecord/lib/active_record/database_configurations/database_config.rb @@ -48,6 +48,18 @@ module ActiveRecord raise NotImplementedError end + def min_threads + raise NotImplementedError + end + + def max_threads + raise NotImplementedError + end + + def max_queue + raise NotImplementedError + end + def checkout_timeout raise NotImplementedError end diff --git a/activerecord/lib/active_record/database_configurations/hash_config.rb b/activerecord/lib/active_record/database_configurations/hash_config.rb index 66d0abb83b6..6960dfd06a6 100644 --- a/activerecord/lib/active_record/database_configurations/hash_config.rb +++ b/activerecord/lib/active_record/database_configurations/hash_config.rb @@ -66,6 +66,18 @@ module ActiveRecord (configuration_hash[:pool] || 5).to_i end + def min_threads + (configuration_hash[:min_threads] || 0).to_i + end + + def max_threads + (configuration_hash[:max_threads] || pool).to_i + end + + def max_queue + max_threads * 4 + end + def checkout_timeout (configuration_hash[:checkout_timeout] || 5).to_f end diff --git a/activerecord/test/cases/adapter_test.rb b/activerecord/test/cases/adapter_test.rb index 92f1d207af6..45fc947f7fa 100644 --- a/activerecord/test/cases/adapter_test.rb +++ b/activerecord/test/cases/adapter_test.rb @@ -343,122 +343,6 @@ module ActiveRecord end end - module AsynchronousQueriesSharedTests - def test_async_select_failure - ActiveRecord::Base.asynchronous_queries_tracker.start_session - - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result - end - ensure - ActiveRecord::Base.asynchronous_queries_tracker.finalize_session - end - - def test_async_query_from_transaction - ActiveRecord::Base.asynchronous_queries_tracker.start_session - - assert_nothing_raised do - @connection.select_all "SELECT * FROM posts", async: true - end - - @connection.transaction do - assert_raises AsynchronousQueryInsideTransactionError do - @connection.select_all "SELECT * FROM posts", async: true - end - end - ensure - ActiveRecord::Base.asynchronous_queries_tracker.finalize_session - end - - def test_async_query_cache - ActiveRecord::Base.asynchronous_queries_tracker.start_session - - @connection.enable_query_cache! - - @connection.select_all "SELECT * FROM posts" - result = @connection.select_all "SELECT * FROM posts", async: true - assert_equal Result, result.class - ensure - ActiveRecord::Base.asynchronous_queries_tracker.finalize_session - @connection.disable_query_cache! - end - - def test_async_query_foreground_fallback - status = {} - - subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| - if event.payload[:sql] == "SELECT * FROM does_not_exists" - status[:executed] = true - status[:async] = event.payload[:async] - end - end - - @connection.pool.stub(:schedule_query, proc { }) do - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result - end - end - - assert_equal true, status[:executed] - assert_equal false, status[:async] - ensure - ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber - end - end - - class AsynchronousQueriesTest < ActiveRecord::TestCase - self.use_transactional_tests = false - - include AsynchronousQueriesSharedTests - - def setup - @connection = ActiveRecord::Base.connection - end - - def test_async_select_all - ActiveRecord::Base.asynchronous_queries_tracker.start_session - status = {} - - monitor = Monitor.new - condition = monitor.new_cond - - subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| - if event.payload[:sql] == "SELECT * FROM posts" - status[:executed] = true - status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } - end - end - - future_result = @connection.select_all "SELECT * FROM posts", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - - monitor.synchronize do - condition.wait_until { status[:executed] } - end - assert_kind_of ActiveRecord::Result, future_result.result - assert_equal @connection.supports_concurrent_connections?, status[:async] - ensure - ActiveRecord::Base.asynchronous_queries_tracker.finalize_session - ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber - end - end - - class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase - self.use_transactional_tests = true - - include AsynchronousQueriesSharedTests - - def setup - @connection = ActiveRecord::Base.connection - @connection.materialize_transactions - end - end - class AdapterForeignKeyTest < ActiveRecord::TestCase self.use_transactional_tests = false diff --git a/activerecord/test/cases/asynchronous_queries_test.rb b/activerecord/test/cases/asynchronous_queries_test.rb new file mode 100644 index 00000000000..1325a7001eb --- /dev/null +++ b/activerecord/test/cases/asynchronous_queries_test.rb @@ -0,0 +1,267 @@ +# frozen_string_literal: true + +require "cases/helper" +require "support/connection_helper" +require "models/post" + +module AsynchronousQueriesSharedTests + def test_async_select_failure + ActiveRecord::Base.asynchronous_queries_tracker.start_session + + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end + ensure + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session + end + + def test_async_query_from_transaction + ActiveRecord::Base.asynchronous_queries_tracker.start_session + + assert_nothing_raised do + @connection.select_all "SELECT * FROM posts", async: true + end + + @connection.transaction do + assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do + @connection.select_all "SELECT * FROM posts", async: true + end + end + ensure + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session + end + + def test_async_query_cache + ActiveRecord::Base.asynchronous_queries_tracker.start_session + + @connection.enable_query_cache! + + @connection.select_all "SELECT * FROM posts" + result = @connection.select_all "SELECT * FROM posts", async: true + assert_equal ActiveRecord::Result, result.class + ensure + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session + @connection.disable_query_cache! + end + + def test_async_query_foreground_fallback + status = {} + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:sql] == "SELECT * FROM does_not_exists" + status[:executed] = true + status[:async] = event.payload[:async] + end + end + + @connection.pool.stub(:schedule_query, proc { }) do + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end + end + + assert_equal true, status[:executed] + assert_equal false, status[:async] + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end +end + +class AsynchronousQueriesTest < ActiveRecord::TestCase + self.use_transactional_tests = false + + include AsynchronousQueriesSharedTests + + def setup + @connection = ActiveRecord::Base.connection + end + + def test_async_select_all + ActiveRecord::Base.asynchronous_queries_tracker.start_session + status = {} + + monitor = Monitor.new + condition = monitor.new_cond + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:sql] == "SELECT * FROM posts" + status[:executed] = true + status[:async] = event.payload[:async] + monitor.synchronize { condition.signal } + end + end + + future_result = @connection.select_all "SELECT * FROM posts", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + + monitor.synchronize do + condition.wait_until { status[:executed] } + end + assert_kind_of ActiveRecord::Result, future_result.result + assert_equal @connection.supports_concurrent_connections?, status[:async] + ensure + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end +end + +class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase + self.use_transactional_tests = true + + include AsynchronousQueriesSharedTests + + def setup + @connection = ActiveRecord::Base.connection + @connection.materialize_transactions + end +end + +class AsynchronousExecutorTypeTest < ActiveRecord::TestCase + def test_immediate_configuration_uses_a_single_immediate_executor_by_default + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :immediate + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary") + db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary") + pool1 = handler.establish_connection(db_config) + pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model) + + async_pool1 = pool1.instance_variable_get(:@async_executor) + async_pool2 = pool2.instance_variable_get(:@async_executor) + + assert async_pool1.is_a?(Concurrent::ImmediateExecutor) + assert async_pool2.is_a?(Concurrent::ImmediateExecutor) + + assert_equal 2, handler.all_connection_pools.count + assert_equal async_pool1, async_pool2 + ensure + clean_up_connection_handler + ActiveRecord::Base.async_query_executor = old_value + end + + def test_one_global_thread_pool_is_used_when_set_with_default_concurrency + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :global_thread_pool + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary") + db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary") + pool1 = handler.establish_connection(db_config) + pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model) + + async_pool1 = pool1.instance_variable_get(:@async_executor) + async_pool2 = pool2.instance_variable_get(:@async_executor) + + assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor) + assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor) + + assert 0, async_pool1.min_length + assert 4, async_pool1.max_length + assert 16, async_pool1.max_queue + assert :caller_runs, async_pool1.fallback_policy + + assert 0, async_pool2.min_length + assert 4, async_pool2.max_length + assert 16, async_pool2.max_queue + assert :caller_runs, async_pool2.fallback_policy + + assert_equal 2, handler.all_connection_pools.count + assert_equal async_pool1, async_pool2 + ensure + clean_up_connection_handler + ActiveRecord::Base.async_query_executor = old_value + end + + def test_concurrency_can_be_set_on_global_thread_pool + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :global_thread_pool + old_concurrency = ActiveRecord::Base.global_executor_concurrency + ActiveRecord::Base.global_executor_concurrency = 8 + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary") + db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary") + pool1 = handler.establish_connection(db_config) + pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model) + + async_pool1 = pool1.instance_variable_get(:@async_executor) + async_pool2 = pool2.instance_variable_get(:@async_executor) + + assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor) + assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor) + + assert 0, async_pool1.min_length + assert 8, async_pool1.max_length + assert 32, async_pool1.max_queue + assert :caller_runs, async_pool1.fallback_policy + + assert 0, async_pool2.min_length + assert 8, async_pool2.max_length + assert 32, async_pool2.max_queue + assert :caller_runs, async_pool2.fallback_policy + + assert_equal 2, handler.all_connection_pools.count + assert_equal async_pool1, async_pool2 + ensure + clean_up_connection_handler + ActiveRecord::Base.global_executor_concurrency = old_concurrency + ActiveRecord::Base.async_query_executor = old_value + end + + def test_concurrency_cannot_be_set_with_immediate_or_multi_thread_pool + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :immediate + + assert_raises ArgumentError do + ActiveRecord::Base.global_executor_concurrency = 8 + end + + ActiveRecord::Base.async_query_executor = :multi_thread_pool + + assert_raises ArgumentError do + ActiveRecord::Base.global_executor_concurrency = 8 + end + ensure + ActiveRecord::Base.async_query_executor = old_value + end + + def test_one_global_thread_pool_uses_concurrency_if_set + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :multi_thread_pool + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + config_hash = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary").configuration_hash + new_config_hash = config_hash.merge(min_threads: 0, max_threads: 10) + db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new("arunit", "primary", new_config_hash) + db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary") + pool1 = handler.establish_connection(db_config) + pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model) + + async_pool1 = pool1.instance_variable_get(:@async_executor) + async_pool2 = pool2.instance_variable_get(:@async_executor) + + assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor) + assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor) + + assert 0, async_pool1.min_length + assert 10, async_pool1.max_length + assert 40, async_pool1.max_queue + assert :caller_runs, async_pool1.fallback_policy + + assert 0, async_pool2.min_length + assert 4, async_pool2.max_length + assert 16, async_pool2.max_queue + assert :caller_runs, async_pool2.fallback_policy + + assert_equal 2, handler.all_connection_pools.count + assert_not_equal async_pool1, async_pool2 + ensure + clean_up_connection_handler + ActiveRecord::Base.async_query_executor = old_value + end +end diff --git a/activerecord/test/cases/database_configurations/hash_config_test.rb b/activerecord/test/cases/database_configurations/hash_config_test.rb index 1eb0bc7ee5f..f7991d58668 100644 --- a/activerecord/test/cases/database_configurations/hash_config_test.rb +++ b/activerecord/test/cases/database_configurations/hash_config_test.rb @@ -20,6 +20,39 @@ module ActiveRecord assert_equal 5, config.pool end + def test_min_threads_with_value + config = HashConfig.new("default_env", "primary", min_threads: "1") + assert_equal 1, config.min_threads + end + + def test_min_threads_default + config = HashConfig.new("default_env", "primary", {}) + assert_equal 0, config.min_threads + end + + def test_max_threads_with_value + config = HashConfig.new("default_env", "primary", max_threads: "10") + assert_equal 10, config.max_threads + end + + def test_max_threads_default_uses_pool_default + config = HashConfig.new("default_env", "primary", {}) + assert_equal 5, config.pool + assert_equal 5, config.max_threads + end + + def test_max_threads_uses_pool_when_set + config = HashConfig.new("default_env", "primary", pool: 1) + assert_equal 1, config.pool + assert_equal 1, config.max_threads + end + + def test_max_queue_is_pool_multipled_by_4 + config = HashConfig.new("default_env", "primary", {}) + assert_equal 5, config.max_threads + assert_equal config.max_threads * 4, config.max_queue + end + def test_checkout_timeout_default_when_nil config = HashConfig.new("default_env", "primary", checkout_timeout: nil) assert_equal 5.0, config.checkout_timeout diff --git a/activerecord/test/support/connection.rb b/activerecord/test/support/connection.rb index b94ca9c095c..6879d591ae1 100644 --- a/activerecord/test/support/connection.rb +++ b/activerecord/test/support/connection.rb @@ -20,6 +20,7 @@ module ARTest def self.connect ActiveRecord::Base.legacy_connection_handling = false + ActiveRecord::Base.async_query_executor = :global_thread_pool puts "Using #{connection_name}" ActiveRecord::Base.logger = ActiveSupport::Logger.new("debug.log", 0, 100 * 1024 * 1024) ActiveRecord::Base.configurations = test_configuration_hashes