diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 5fbb20c7cb0..d2b06dea0e5 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,3 +1,17 @@ +* Allow applications to configure the thread pool for async queries + + Some applications may want one thread pool per database whereas others want to use + a single global thread pool for all queries. By default Rails will set `async_query_executor` + to `:immediate` and create a `Concurrent::ImmediateExecutor` object which is essentially a no-op. + To create one thread pool for all database connections to use applications can set + `config.active_record.async_query_executor` to `:global_thread_pool` and optionally define + `config.active_record.global_executor_concurrency`. This defaults to 4. For applications that want + to have a thread pool for each database connection, `config.active_record.async_query_executor` can + be set to `:multi_thread_pool`. The configuration for each thread pool is set in the database + configuration. + + *Eileen M. Uchitelle* + * Allow new syntax for `enum` to avoid leading `_` from reserved options. Before: diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 5670790844f..6582a6df545 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -144,12 +144,7 @@ module ActiveRecord @lock_thread = false - @async_executor = Concurrent::ThreadPoolExecutor.new( - min_threads: 0, - max_threads: @size, - max_queue: @size * 4, - fallback_policy: :caller_runs - ) + @async_executor = build_async_executor @reaper = Reaper.new(self, db_config.reaping_frequency) @reaper.run @@ -463,6 +458,22 @@ module ActiveRecord end private + def build_async_executor + case Base.async_query_executor + when :multi_thread_pool + Concurrent::ThreadPoolExecutor.new( + min_threads: @db_config.min_threads, + max_threads: @db_config.max_threads, + max_queue: @db_config.max_queue, + fallback_policy: :caller_runs + ) + when :global_thread_pool + Base.global_thread_pool_async_query_executor + else + Base.immediate_query_executor + end + end + #-- # this is unfortunately not concurrent def bulk_make_new_connections(num_new_conns_needed) diff --git a/activerecord/lib/active_record/core.rb b/activerecord/lib/active_record/core.rb index 3713da970f1..643b13670ba 100644 --- a/activerecord/lib/active_record/core.rb +++ b/activerecord/lib/active_record/core.rb @@ -157,6 +157,44 @@ module ActiveRecord mattr_accessor :application_record_class, instance_accessor: false, default: nil + # Sets the async_query_executor for an application. By default the thread pool executor + # set to `:immediate. Options are: + # + # * :immediate - Initializes a single +Concurrent::ImmediateExecutor+ + # * :global_thread_pool - Initializes a single +Concurrent::ThreadPoolExecutor+ + # that uses the +async_query_concurrency+ for the +max_threads+ value. + # * :multi_thread_pool - Initializes a +Concurrent::ThreadPoolExecutor+ for each + # database connection. The initializer values are defined in the configuration hash. + mattr_accessor :async_query_executor, instance_accessor: false, default: :immediate + + def self.immediate_query_executor # :nodoc: + @@immediate_query_executor ||= Concurrent::ImmediateExecutor.new + end + + def self.global_thread_pool_async_query_executor # :nodoc: + concurrency = global_executor_concurrency || 4 + @@global_thread_pool_async_query_executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 0, + max_threads: concurrency, + max_queue: concurrency * 4, + fallback_policy: :caller_runs + ) + end + + # Set the +global_executor_concurrency+. This configuration value can only be used + # with the global thread pool async query executor. + def self.global_executor_concurrency=(global_executor_concurrency) + if async_query_executor == :immediate || async_query_executor == :multi_thread_pool + raise ArgumentError, "`global_executor_concurrency` cannot be set when using either immediate or multiple thread pools. For multiple thread pools, please set the concurrency in your database configuration. Immediate thread pools are essentially a no-op." + end + + @@global_executor_concurrency = global_executor_concurrency + end + + def self.global_executor_concurrency # :nodoc: + @@global_executor_concurrency ||= nil + end + def self.application_record_class? # :nodoc: if Base.application_record_class self == Base.application_record_class diff --git a/activerecord/lib/active_record/database_configurations/database_config.rb b/activerecord/lib/active_record/database_configurations/database_config.rb index 6ad9b3922e7..aeb22993f88 100644 --- a/activerecord/lib/active_record/database_configurations/database_config.rb +++ b/activerecord/lib/active_record/database_configurations/database_config.rb @@ -48,6 +48,18 @@ module ActiveRecord raise NotImplementedError end + def min_threads + raise NotImplementedError + end + + def max_threads + raise NotImplementedError + end + + def max_queue + raise NotImplementedError + end + def checkout_timeout raise NotImplementedError end diff --git a/activerecord/lib/active_record/database_configurations/hash_config.rb b/activerecord/lib/active_record/database_configurations/hash_config.rb index 66d0abb83b6..6960dfd06a6 100644 --- a/activerecord/lib/active_record/database_configurations/hash_config.rb +++ b/activerecord/lib/active_record/database_configurations/hash_config.rb @@ -66,6 +66,18 @@ module ActiveRecord (configuration_hash[:pool] || 5).to_i end + def min_threads + (configuration_hash[:min_threads] || 0).to_i + end + + def max_threads + (configuration_hash[:max_threads] || pool).to_i + end + + def max_queue + max_threads * 4 + end + def checkout_timeout (configuration_hash[:checkout_timeout] || 5).to_f end diff --git a/activerecord/test/cases/adapter_test.rb b/activerecord/test/cases/adapter_test.rb index 92f1d207af6..45fc947f7fa 100644 --- a/activerecord/test/cases/adapter_test.rb +++ b/activerecord/test/cases/adapter_test.rb @@ -343,122 +343,6 @@ module ActiveRecord end end - module AsynchronousQueriesSharedTests - def test_async_select_failure - ActiveRecord::Base.asynchronous_queries_tracker.start_session - - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result - end - ensure - ActiveRecord::Base.asynchronous_queries_tracker.finalize_session - end - - def test_async_query_from_transaction - ActiveRecord::Base.asynchronous_queries_tracker.start_session - - assert_nothing_raised do - @connection.select_all "SELECT * FROM posts", async: true - end - - @connection.transaction do - assert_raises AsynchronousQueryInsideTransactionError do - @connection.select_all "SELECT * FROM posts", async: true - end - end - ensure - ActiveRecord::Base.asynchronous_queries_tracker.finalize_session - end - - def test_async_query_cache - ActiveRecord::Base.asynchronous_queries_tracker.start_session - - @connection.enable_query_cache! - - @connection.select_all "SELECT * FROM posts" - result = @connection.select_all "SELECT * FROM posts", async: true - assert_equal Result, result.class - ensure - ActiveRecord::Base.asynchronous_queries_tracker.finalize_session - @connection.disable_query_cache! - end - - def test_async_query_foreground_fallback - status = {} - - subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| - if event.payload[:sql] == "SELECT * FROM does_not_exists" - status[:executed] = true - status[:async] = event.payload[:async] - end - end - - @connection.pool.stub(:schedule_query, proc { }) do - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result - end - end - - assert_equal true, status[:executed] - assert_equal false, status[:async] - ensure - ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber - end - end - - class AsynchronousQueriesTest < ActiveRecord::TestCase - self.use_transactional_tests = false - - include AsynchronousQueriesSharedTests - - def setup - @connection = ActiveRecord::Base.connection - end - - def test_async_select_all - ActiveRecord::Base.asynchronous_queries_tracker.start_session - status = {} - - monitor = Monitor.new - condition = monitor.new_cond - - subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| - if event.payload[:sql] == "SELECT * FROM posts" - status[:executed] = true - status[:async] = event.payload[:async] - monitor.synchronize { condition.signal } - end - end - - future_result = @connection.select_all "SELECT * FROM posts", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - - monitor.synchronize do - condition.wait_until { status[:executed] } - end - assert_kind_of ActiveRecord::Result, future_result.result - assert_equal @connection.supports_concurrent_connections?, status[:async] - ensure - ActiveRecord::Base.asynchronous_queries_tracker.finalize_session - ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber - end - end - - class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase - self.use_transactional_tests = true - - include AsynchronousQueriesSharedTests - - def setup - @connection = ActiveRecord::Base.connection - @connection.materialize_transactions - end - end - class AdapterForeignKeyTest < ActiveRecord::TestCase self.use_transactional_tests = false diff --git a/activerecord/test/cases/asynchronous_queries_test.rb b/activerecord/test/cases/asynchronous_queries_test.rb new file mode 100644 index 00000000000..1325a7001eb --- /dev/null +++ b/activerecord/test/cases/asynchronous_queries_test.rb @@ -0,0 +1,267 @@ +# frozen_string_literal: true + +require "cases/helper" +require "support/connection_helper" +require "models/post" + +module AsynchronousQueriesSharedTests + def test_async_select_failure + ActiveRecord::Base.asynchronous_queries_tracker.start_session + + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end + ensure + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session + end + + def test_async_query_from_transaction + ActiveRecord::Base.asynchronous_queries_tracker.start_session + + assert_nothing_raised do + @connection.select_all "SELECT * FROM posts", async: true + end + + @connection.transaction do + assert_raises ActiveRecord::AsynchronousQueryInsideTransactionError do + @connection.select_all "SELECT * FROM posts", async: true + end + end + ensure + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session + end + + def test_async_query_cache + ActiveRecord::Base.asynchronous_queries_tracker.start_session + + @connection.enable_query_cache! + + @connection.select_all "SELECT * FROM posts" + result = @connection.select_all "SELECT * FROM posts", async: true + assert_equal ActiveRecord::Result, result.class + ensure + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session + @connection.disable_query_cache! + end + + def test_async_query_foreground_fallback + status = {} + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:sql] == "SELECT * FROM does_not_exists" + status[:executed] = true + status[:async] = event.payload[:async] + end + end + + @connection.pool.stub(:schedule_query, proc { }) do + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end + end + + assert_equal true, status[:executed] + assert_equal false, status[:async] + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end +end + +class AsynchronousQueriesTest < ActiveRecord::TestCase + self.use_transactional_tests = false + + include AsynchronousQueriesSharedTests + + def setup + @connection = ActiveRecord::Base.connection + end + + def test_async_select_all + ActiveRecord::Base.asynchronous_queries_tracker.start_session + status = {} + + monitor = Monitor.new + condition = monitor.new_cond + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:sql] == "SELECT * FROM posts" + status[:executed] = true + status[:async] = event.payload[:async] + monitor.synchronize { condition.signal } + end + end + + future_result = @connection.select_all "SELECT * FROM posts", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + + monitor.synchronize do + condition.wait_until { status[:executed] } + end + assert_kind_of ActiveRecord::Result, future_result.result + assert_equal @connection.supports_concurrent_connections?, status[:async] + ensure + ActiveRecord::Base.asynchronous_queries_tracker.finalize_session + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end +end + +class AsynchronousQueriesWithTransactionalTest < ActiveRecord::TestCase + self.use_transactional_tests = true + + include AsynchronousQueriesSharedTests + + def setup + @connection = ActiveRecord::Base.connection + @connection.materialize_transactions + end +end + +class AsynchronousExecutorTypeTest < ActiveRecord::TestCase + def test_immediate_configuration_uses_a_single_immediate_executor_by_default + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :immediate + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary") + db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary") + pool1 = handler.establish_connection(db_config) + pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model) + + async_pool1 = pool1.instance_variable_get(:@async_executor) + async_pool2 = pool2.instance_variable_get(:@async_executor) + + assert async_pool1.is_a?(Concurrent::ImmediateExecutor) + assert async_pool2.is_a?(Concurrent::ImmediateExecutor) + + assert_equal 2, handler.all_connection_pools.count + assert_equal async_pool1, async_pool2 + ensure + clean_up_connection_handler + ActiveRecord::Base.async_query_executor = old_value + end + + def test_one_global_thread_pool_is_used_when_set_with_default_concurrency + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :global_thread_pool + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary") + db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary") + pool1 = handler.establish_connection(db_config) + pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model) + + async_pool1 = pool1.instance_variable_get(:@async_executor) + async_pool2 = pool2.instance_variable_get(:@async_executor) + + assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor) + assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor) + + assert 0, async_pool1.min_length + assert 4, async_pool1.max_length + assert 16, async_pool1.max_queue + assert :caller_runs, async_pool1.fallback_policy + + assert 0, async_pool2.min_length + assert 4, async_pool2.max_length + assert 16, async_pool2.max_queue + assert :caller_runs, async_pool2.fallback_policy + + assert_equal 2, handler.all_connection_pools.count + assert_equal async_pool1, async_pool2 + ensure + clean_up_connection_handler + ActiveRecord::Base.async_query_executor = old_value + end + + def test_concurrency_can_be_set_on_global_thread_pool + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :global_thread_pool + old_concurrency = ActiveRecord::Base.global_executor_concurrency + ActiveRecord::Base.global_executor_concurrency = 8 + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary") + db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary") + pool1 = handler.establish_connection(db_config) + pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model) + + async_pool1 = pool1.instance_variable_get(:@async_executor) + async_pool2 = pool2.instance_variable_get(:@async_executor) + + assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor) + assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor) + + assert 0, async_pool1.min_length + assert 8, async_pool1.max_length + assert 32, async_pool1.max_queue + assert :caller_runs, async_pool1.fallback_policy + + assert 0, async_pool2.min_length + assert 8, async_pool2.max_length + assert 32, async_pool2.max_queue + assert :caller_runs, async_pool2.fallback_policy + + assert_equal 2, handler.all_connection_pools.count + assert_equal async_pool1, async_pool2 + ensure + clean_up_connection_handler + ActiveRecord::Base.global_executor_concurrency = old_concurrency + ActiveRecord::Base.async_query_executor = old_value + end + + def test_concurrency_cannot_be_set_with_immediate_or_multi_thread_pool + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :immediate + + assert_raises ArgumentError do + ActiveRecord::Base.global_executor_concurrency = 8 + end + + ActiveRecord::Base.async_query_executor = :multi_thread_pool + + assert_raises ArgumentError do + ActiveRecord::Base.global_executor_concurrency = 8 + end + ensure + ActiveRecord::Base.async_query_executor = old_value + end + + def test_one_global_thread_pool_uses_concurrency_if_set + old_value = ActiveRecord::Base.async_query_executor + ActiveRecord::Base.async_query_executor = :multi_thread_pool + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + config_hash = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary").configuration_hash + new_config_hash = config_hash.merge(min_threads: 0, max_threads: 10) + db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new("arunit", "primary", new_config_hash) + db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary") + pool1 = handler.establish_connection(db_config) + pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model) + + async_pool1 = pool1.instance_variable_get(:@async_executor) + async_pool2 = pool2.instance_variable_get(:@async_executor) + + assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor) + assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor) + + assert 0, async_pool1.min_length + assert 10, async_pool1.max_length + assert 40, async_pool1.max_queue + assert :caller_runs, async_pool1.fallback_policy + + assert 0, async_pool2.min_length + assert 4, async_pool2.max_length + assert 16, async_pool2.max_queue + assert :caller_runs, async_pool2.fallback_policy + + assert_equal 2, handler.all_connection_pools.count + assert_not_equal async_pool1, async_pool2 + ensure + clean_up_connection_handler + ActiveRecord::Base.async_query_executor = old_value + end +end diff --git a/activerecord/test/cases/database_configurations/hash_config_test.rb b/activerecord/test/cases/database_configurations/hash_config_test.rb index 1eb0bc7ee5f..f7991d58668 100644 --- a/activerecord/test/cases/database_configurations/hash_config_test.rb +++ b/activerecord/test/cases/database_configurations/hash_config_test.rb @@ -20,6 +20,39 @@ module ActiveRecord assert_equal 5, config.pool end + def test_min_threads_with_value + config = HashConfig.new("default_env", "primary", min_threads: "1") + assert_equal 1, config.min_threads + end + + def test_min_threads_default + config = HashConfig.new("default_env", "primary", {}) + assert_equal 0, config.min_threads + end + + def test_max_threads_with_value + config = HashConfig.new("default_env", "primary", max_threads: "10") + assert_equal 10, config.max_threads + end + + def test_max_threads_default_uses_pool_default + config = HashConfig.new("default_env", "primary", {}) + assert_equal 5, config.pool + assert_equal 5, config.max_threads + end + + def test_max_threads_uses_pool_when_set + config = HashConfig.new("default_env", "primary", pool: 1) + assert_equal 1, config.pool + assert_equal 1, config.max_threads + end + + def test_max_queue_is_pool_multipled_by_4 + config = HashConfig.new("default_env", "primary", {}) + assert_equal 5, config.max_threads + assert_equal config.max_threads * 4, config.max_queue + end + def test_checkout_timeout_default_when_nil config = HashConfig.new("default_env", "primary", checkout_timeout: nil) assert_equal 5.0, config.checkout_timeout diff --git a/activerecord/test/support/connection.rb b/activerecord/test/support/connection.rb index b94ca9c095c..6879d591ae1 100644 --- a/activerecord/test/support/connection.rb +++ b/activerecord/test/support/connection.rb @@ -20,6 +20,7 @@ module ARTest def self.connect ActiveRecord::Base.legacy_connection_handling = false + ActiveRecord::Base.async_query_executor = :global_thread_pool puts "Using #{connection_name}" ActiveRecord::Base.logger = ActiveSupport::Logger.new("debug.log", 0, 100 * 1024 * 1024) ActiveRecord::Base.configurations = test_configuration_hashes