From 1dcb41142953e12c308900159893ddd74aaa0026 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Wed, 7 Feb 2024 10:50:17 +0100 Subject: [PATCH] Decouple transactional fixtures and active connections Ref: https://github.com/rails/rails/pull/50793 Transactional fixtures are currently tightly coupled with the pool active connection. It assumes calling `pool.connection` will memoize the checked out connection and leverage that to start a transaction on it and ensure all subsequent accesses will get the same connection. To allow to remove checkout caching (or make it optional), we first must decouple transactional fixtures to not rely on it. The idea is to behave similarly, but store the connection in the pool as a special "pinned" connection, and not as the regular active connection. This allows to always return the same pinned connection, but without necessarily assigning it as the active connection. Additionally, this pinning impact all threads and fibers, so that all threads have a consistent view of the database state. --- .../abstract/connection_pool.rb | 78 +++++++++++++------ .../abstract/query_cache.rb | 6 +- .../connection_adapters/abstract_adapter.rb | 10 +-- .../lib/active_record/test_fixtures.rb | 55 +++++-------- .../abstract_mysql_adapter/connection_test.rb | 2 +- .../adapters/postgresql/connection_test.rb | 2 +- .../connection_handler_test.rb | 6 -- .../test/cases/connection_pool_test.rb | 65 +++++++++++++++- activerecord/test/cases/fixtures_test.rb | 43 ++++++---- .../test/cases/invalid_connection_test.rb | 3 +- activerecord/test/cases/query_cache_test.rb | 28 +++---- activerecord/test/cases/test_fixtures_test.rb | 3 + railties/test/application/test_runner_test.rb | 5 +- 13 files changed, 197 insertions(+), 109 deletions(-) diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index ac4a72b5ab0..7de0d12d08b 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -162,8 +162,7 @@ module ActiveRecord @threads_blocking_new_connections = 0 @available = ConnectionLeasingQueue.new self - - @lock_thread = false + @pinned_connection = nil @async_executor = build_async_executor @@ -171,25 +170,48 @@ module ActiveRecord @reaper.run end - def lock_thread=(lock_thread) - if lock_thread - @lock_thread = ActiveSupport::IsolatedExecutionState.context - else - @lock_thread = nil - end - - if (active_connection = @thread_cached_conns[connection_cache_key(current_thread)]) - active_connection.lock_thread = @lock_thread - end - end - # Retrieve the connection associated with the current thread, or call # #checkout to obtain one if necessary. # # #connection can be called any number of times; the connection is # held in a cache keyed by a thread. def connection - @thread_cached_conns[connection_cache_key(current_thread)] ||= checkout + @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] ||= checkout + end + + def pin_connection!(lock_thread) # :nodoc: + raise "There is already a pinned connection" if @pinned_connection + + @pinned_connection = (@thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] || checkout) + # Any leased connection must be in @connections otherwise + # some methods like #connected? won't behave correctly + unless @connections.include?(@pinned_connection) + @connections << @pinned_connection + end + + @pinned_connection.lock_thread = ActiveSupport::IsolatedExecutionState.context if lock_thread + @pinned_connection.verify! # eagerly validate the connection + @pinned_connection.begin_transaction joinable: false, _lazy: false + end + + def unpin_connection! # :nodoc: + raise "There isn't a pinned connection #{object_id}" unless @pinned_connection + + clean = true + @pinned_connection.lock.synchronize do + connection, @pinned_connection = @pinned_connection, nil + if connection.transaction_open? + connection.rollback_transaction + else + # Something committed or rolled back the transaction + clean = false + connection.reset! + end + connection.lock_thread = nil + checkin(connection) + end + + clean end def connection_class # :nodoc: @@ -204,7 +226,7 @@ module ActiveRecord # #connection or #with_connection methods. Connections obtained through # #checkout will not be detected by #active_connection? def active_connection? - @thread_cached_conns[connection_cache_key(current_thread)] + @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] end # Signal that the thread is finished with the current connection. @@ -276,6 +298,7 @@ module ActiveRecord conn.disconnect! end @connections = [] + @thread_cached_conns.clear @available.clear end end @@ -360,9 +383,19 @@ module ActiveRecord # Raises: # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool. def checkout(checkout_timeout = @checkout_timeout) - connection = checkout_and_verify(acquire_connection(checkout_timeout)) - connection.lock_thread = @lock_thread - connection + if @pinned_connection + synchronize do + @pinned_connection.verify! + # Any leased connection must be in @connections otherwise + # some methods like #connected? won't behave correctly + unless @connections.include?(@pinned_connection) + @connections << @pinned_connection + end + end + @pinned_connection + else + checkout_and_verify(acquire_connection(checkout_timeout)) + end end # Check-in a database connection back into the pool, indicating that you @@ -371,6 +404,8 @@ module ActiveRecord # +conn+: an AbstractAdapter object, which was obtained by earlier by # calling #checkout on this pool. def checkin(conn) + return if @pinned_connection.equal?(conn) + conn.lock.synchronize do synchronize do remove_connection_from_thread_cache conn @@ -379,7 +414,6 @@ module ActiveRecord conn.expire end - conn.lock_thread = nil @available.add conn end end @@ -533,10 +567,6 @@ module ActiveRecord thread end - def current_thread - @lock_thread || ActiveSupport::IsolatedExecutionState.context - end - # Take control of all existing connections so a "group" action such as # reload/disconnect can be performed safely. It is no longer enough to # wrap it in +synchronize+ because some pool's actions are allowed diff --git a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb index 6871300b8c0..88c3240f860 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/query_cache.rb @@ -36,17 +36,17 @@ module ActiveRecord end def enable_query_cache! - @query_cache_enabled[connection_cache_key(current_thread)] = true + @query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] = true connection.enable_query_cache! if active_connection? end def disable_query_cache! - @query_cache_enabled.delete connection_cache_key(current_thread) + @query_cache_enabled.delete connection_cache_key(ActiveSupport::IsolatedExecutionState.context) connection.disable_query_cache! if active_connection? end def query_cache_enabled - @query_cache_enabled[connection_cache_key(current_thread)] + @query_cache_enabled[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] end end diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index af23562dba8..7b6154473e9 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -174,19 +174,13 @@ module ActiveRecord @verified = false end - THREAD_LOCK = ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new - private_constant :THREAD_LOCK - - FIBER_LOCK = ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new - private_constant :FIBER_LOCK - def lock_thread=(lock_thread) # :nodoc: @lock = case lock_thread when Thread - THREAD_LOCK + ActiveSupport::Concurrency::ThreadLoadInterlockAwareMonitor.new when Fiber - FIBER_LOCK + ActiveSupport::Concurrency::LoadInterlockAwareMonitor.new else ActiveSupport::Concurrency::NullLock end diff --git a/activerecord/lib/active_record/test_fixtures.rb b/activerecord/lib/active_record/test_fixtures.rb index 759c3141a6e..ced214f0968 100644 --- a/activerecord/lib/active_record/test_fixtures.rb +++ b/activerecord/lib/active_record/test_fixtures.rb @@ -131,8 +131,8 @@ module ActiveRecord end @fixture_cache = {} - @fixture_connections = {} @fixture_cache_key = [self.class.fixture_table_names.dup, self.class.fixture_paths.dup, self.class.fixture_class_names.dup] + @fixture_connection_pools = [] @@already_loaded_fixtures ||= {} @connection_subscriber = nil @saved_pool_configs = Hash.new { |hash, key| hash[key] = {} } @@ -163,20 +163,24 @@ module ActiveRecord teardown_transactional_fixtures else ActiveRecord::FixtureSet.reset_cache + invalidate_already_loaded_fixtures end ActiveRecord::Base.connection_handler.clear_active_connections!(:all) end + def invalidate_already_loaded_fixtures + @@already_loaded_fixtures.clear + end + def setup_transactional_fixtures setup_shared_connection_pool # Begin transactions for connections already established - @fixture_connections = ActiveRecord::Base.connection_handler.connection_pool_list(:writing).to_h do |pool| - pool.lock_thread = true if lock_threads - connection = pool.connection - transaction = connection.begin_transaction(joinable: false, _lazy: false) - [connection, transaction] + @fixture_connection_pools = ActiveRecord::Base.connection_handler.connection_pool_list(:writing) + @fixture_connection_pools.each do |pool| + pool.pin_connection!(lock_threads) + pool.connection end # When connections are established in the future, begin a transaction too @@ -185,19 +189,14 @@ module ActiveRecord shard = payload[:shard] if payload.key?(:shard) if connection_name - begin - connection = ActiveRecord::Base.connection_handler.retrieve_connection(connection_name, shard: shard) - connection.connect! # eagerly validate the connection - rescue ConnectionNotEstablished - connection = nil - end - - if connection + pool = ActiveRecord::Base.connection_handler.retrieve_connection_pool(connection_name, shard: shard) + if pool setup_shared_connection_pool - if !@fixture_connections.key?(connection) - connection.pool.lock_thread = true if lock_threads - @fixture_connections[connection] = connection.begin_transaction(joinable: false, _lazy: false) + unless @fixture_connection_pools.include?(pool) + pool.pin_connection!(lock_threads) + pool.connection + @fixture_connection_pools << pool end end end @@ -206,27 +205,15 @@ module ActiveRecord def teardown_transactional_fixtures ActiveSupport::Notifications.unsubscribe(@connection_subscriber) if @connection_subscriber - @fixture_connections.each do |connection, transaction| - begin - connection.rollback_transaction(transaction) - rescue ActiveRecord::StatementInvalid - # Something commited or rolled back the transaction. - # We can no longer trust the database state is clean. - invalidate_already_loaded_fixtures - # We also don't know for sure the connection wasn't - # mutated in dangerous ways. - connection.disconnect! - end - connection.pool.lock_thread = false + unless @fixture_connection_pools.map(&:unpin_connection!).all? + # Something caused the transaction to be committed or rolled back + # We can no longer trust the database is in a clean state. + @@already_loaded_fixtures.clear end - @fixture_connections.clear + @fixture_connection_pools.clear teardown_shared_connection_pool end - def invalidate_already_loaded_fixtures - @@already_loaded_fixtures.clear - end - # Shares the writing connection pool with connections on # other handlers. # diff --git a/activerecord/test/cases/adapters/abstract_mysql_adapter/connection_test.rb b/activerecord/test/cases/adapters/abstract_mysql_adapter/connection_test.rb index a77edcff820..f06c970b81c 100644 --- a/activerecord/test/cases/adapters/abstract_mysql_adapter/connection_test.rb +++ b/activerecord/test/cases/adapters/abstract_mysql_adapter/connection_test.rb @@ -37,7 +37,7 @@ class ConnectionTest < ActiveRecord::AbstractMysqlTestCase assert_not_predicate @connection, :active? ensure # Repair all fixture connections so other tests won't break. - @fixture_connections.each_key(&:verify!) + @fixture_connection_pools.each { |p| p.connection.verify! } end def test_successful_reconnection_after_timeout_with_manual_reconnect diff --git a/activerecord/test/cases/adapters/postgresql/connection_test.rb b/activerecord/test/cases/adapters/postgresql/connection_test.rb index af40c4c72a2..ced2dfa3842 100644 --- a/activerecord/test/cases/adapters/postgresql/connection_test.rb +++ b/activerecord/test/cases/adapters/postgresql/connection_test.rb @@ -145,7 +145,7 @@ module ActiveRecord assert_predicate @connection, :active? ensure # Repair all fixture connections so other tests won't break. - @fixture_connections.each_key(&:verify!) + @fixture_connection_pools.each { |p| p.connection.verify! } end def test_set_session_variable_true diff --git a/activerecord/test/cases/connection_adapters/connection_handler_test.rb b/activerecord/test/cases/connection_adapters/connection_handler_test.rb index 9f2b0c35c18..395e6995068 100644 --- a/activerecord/test/cases/connection_adapters/connection_handler_test.rb +++ b/activerecord/test/cases/connection_adapters/connection_handler_test.rb @@ -6,8 +6,6 @@ require "models/person" module ActiveRecord module ConnectionAdapters class ConnectionHandlerTest < ActiveRecord::TestCase - self.use_transactional_tests = false - fixtures :people def setup @@ -95,8 +93,6 @@ module ActiveRecord connection_handler = ActiveRecord::Base.connection_handler ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new - setup_transactional_fixtures - assert_nothing_raised do ActiveRecord::Base.connects_to(database: { reading: :arunit, writing: :arunit }) end @@ -105,8 +101,6 @@ module ActiveRecord ro_conn = ActiveRecord::Base.connection_handler.retrieve_connection("ActiveRecord::Base", role: :reading) assert_equal rw_conn, ro_conn - - teardown_transactional_fixtures ensure ActiveRecord::Base.connection_handler = connection_handler end diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index a4daec2850b..59f29ee9ac3 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -845,6 +845,69 @@ module ActiveRecord assert_equal :shard_one, pool.connection.shard end + def test_pin_connection_always_returns_the_same_connection + assert_not_predicate @pool, :active_connection? + @pool.pin_connection!(true) + pinned_connection = @pool.checkout + + assert_not_predicate @pool, :active_connection? + assert_same pinned_connection, @pool.connection + assert_predicate @pool, :active_connection? + + assert_same pinned_connection, @pool.checkout + + @pool.release_connection + assert_not_predicate @pool, :active_connection? + assert_same pinned_connection, @pool.checkout + end + + def test_pin_connection_connected? + skip("Can't test with in-memory dbs") if in_memory_db? + + assert_not_predicate @pool, :connected? + @pool.pin_connection!(true) + assert_predicate @pool, :connected? + + pin_connection = @pool.checkout + + @pool.disconnect + assert_not_predicate @pool, :connected? + assert_same pin_connection, @pool.checkout + assert_predicate @pool, :connected? + end + + def test_pin_connection_synchronize_the_connection + assert_equal ActiveSupport::Concurrency::NullLock, @pool.connection.lock + @pool.pin_connection!(true) + assert_not_equal ActiveSupport::Concurrency::NullLock, @pool.connection.lock + @pool.unpin_connection! + assert_equal ActiveSupport::Concurrency::NullLock, @pool.connection.lock + + @pool.pin_connection!(false) + assert_equal ActiveSupport::Concurrency::NullLock, @pool.connection.lock + end + + def test_pin_connection_opens_a_transaction + assert_instance_of NullTransaction, @pool.connection.current_transaction + @pool.pin_connection!(true) + assert_instance_of RealTransaction, @pool.connection.current_transaction + @pool.unpin_connection! + assert_instance_of NullTransaction, @pool.connection.current_transaction + end + + def test_unpin_connection_returns_whether_transaction_has_been_rolledback + @pool.pin_connection!(true) + assert_equal true, @pool.unpin_connection! + + @pool.pin_connection!(true) + @pool.connection.commit_transaction + assert_equal false, @pool.unpin_connection! + + @pool.pin_connection!(true) + @pool.connection.rollback_transaction + assert_equal false, @pool.unpin_connection! + end + private def with_single_connection_pool config = @db_config.configuration_hash.merge(pool: 1) @@ -871,8 +934,8 @@ module ActiveRecord end def test_lock_thread_allow_fiber_reentrency - @pool.lock_thread = true connection = @pool.checkout + connection.lock_thread = ActiveSupport::IsolatedExecutionState.context connection.transaction do enumerator = Enumerator.new do |yielder| connection.transaction do diff --git a/activerecord/test/cases/fixtures_test.rb b/activerecord/test/cases/fixtures_test.rb index 3d25e807727..9e463ca3f1b 100644 --- a/activerecord/test/cases/fixtures_test.rb +++ b/activerecord/test/cases/fixtures_test.rb @@ -1050,12 +1050,16 @@ class TransactionalFixturesOnConnectionNotification < ActiveRecord::TestCase def connect!; end end.new - connection.pool = Class.new do - def lock_thread=(lock_thread); end - end.new + pool = connection.pool = Class.new do + attr_reader :connection + def initialize(connection); @connection = connection; end + def release_connection; end + def pin_connection!(_); end + def unpin_connection!; @connection.rollback_transaction; true; end + end.new(connection) - assert_called_with(connection, :begin_transaction, [], joinable: false, _lazy: false) do - fire_connection_notification(connection) + assert_called_with(pool, :pin_connection!, [true]) do + fire_connection_notification(connection.pool) end end @@ -1069,14 +1073,19 @@ class TransactionalFixturesOnConnectionNotification < ActiveRecord::TestCase def rollback_transaction(*args) @rollback_transaction_called = true end + def lock_thread=(lock_thread); end def connect!; end end.new connection.pool = Class.new do - def lock_thread=(lock_thread); end - end.new + attr_reader :connection + def initialize(connection); @connection = connection; end + def release_connection; end + def pin_connection!(_); end + def unpin_connection!; @connection.rollback_transaction; true; end + end.new(connection) - fire_connection_notification(connection) + fire_connection_notification(connection.pool) teardown_fixtures assert(connection.rollback_transaction_called, "Expected #rollback_transaction to be called but was not") @@ -1093,17 +1102,21 @@ class TransactionalFixturesOnConnectionNotification < ActiveRecord::TestCase end.new connection.pool = Class.new do - def lock_thread=(lock_thread); end - end.new + attr_reader :connection + def initialize(connection); @connection = connection; end + def release_connection; end + def pin_connection!(_); end + def unpin_connection!; @connection.rollback_transaction; true; end + end.new(connection) - assert_called_with(connection, :begin_transaction, [], joinable: false, _lazy: false) do - fire_connection_notification(connection, shard: :shard_two) + assert_called_with(connection.pool, :pin_connection!, [true]) do + fire_connection_notification(connection.pool, shard: :shard_two) end end private - def fire_connection_notification(connection, shard: ActiveRecord::Base.default_shard) - assert_called_with(ActiveRecord::Base.connection_handler, :retrieve_connection, ["book"], returns: connection, shard: shard) do + def fire_connection_notification(pool, shard: ActiveRecord::Base.default_shard) + assert_called_with(ActiveRecord::Base.connection_handler, :retrieve_connection_pool, ["book"], returns: pool, shard: shard) do message_bus = ActiveSupport::Notifications.instrumenter payload = { connection_name: "book", @@ -1152,7 +1165,7 @@ end class FixturesBrokenRollbackTest < ActiveRecord::TestCase def blank_setup - @fixture_connections = [ActiveRecord::Base.connection] + @fixture_connection_pools = [ActiveRecord::Base.connection_pool] end alias_method :ar_setup_fixtures, :setup_fixtures alias_method :setup_fixtures, :blank_setup diff --git a/activerecord/test/cases/invalid_connection_test.rb b/activerecord/test/cases/invalid_connection_test.rb index 54d01d1c3eb..747eb029500 100644 --- a/activerecord/test/cases/invalid_connection_test.rb +++ b/activerecord/test/cases/invalid_connection_test.rb @@ -3,8 +3,9 @@ require "cases/helper" class TestAdapterWithInvalidConnection < ActiveRecord::TestCase - if current_adapter?(:Mysql2Adapter, :TrilogyAdapter) + self.use_transactional_tests = false + if current_adapter?(:Mysql2Adapter, :TrilogyAdapter) class Bird < ActiveRecord::Base end diff --git a/activerecord/test/cases/query_cache_test.rb b/activerecord/test/cases/query_cache_test.rb index 89473b73b27..c8c01d1635d 100644 --- a/activerecord/test/cases/query_cache_test.rb +++ b/activerecord/test/cases/query_cache_test.rb @@ -634,22 +634,24 @@ class QueryCacheTest < ActiveRecord::TestCase end test "query cache is enabled in threads with shared connection" do - ActiveRecord::Base.connection_pool.lock_thread = true + ActiveRecord::Base.connection_pool.pin_connection!(ActiveSupport::IsolatedExecutionState.context) - assert_cache :off - ActiveRecord::Base.connection.enable_query_cache! - assert_cache :clean + begin + assert_cache :off + ActiveRecord::Base.connection.enable_query_cache! + assert_cache :clean - thread_a = Thread.new do - middleware { |env| - assert_cache :clean - [200, {}, nil] - }.call({}) + thread_a = Thread.new do + middleware { |env| + assert_cache :clean + [200, {}, nil] + }.call({}) + end + + thread_a.join + ensure + ActiveRecord::Base.connection_pool.unpin_connection! end - - thread_a.join - - ActiveRecord::Base.connection_pool.lock_thread = false end private diff --git a/activerecord/test/cases/test_fixtures_test.rb b/activerecord/test/cases/test_fixtures_test.rb index f53943d45ff..3055266515f 100644 --- a/activerecord/test/cases/test_fixtures_test.rb +++ b/activerecord/test/cases/test_fixtures_test.rb @@ -56,6 +56,9 @@ class TestFixturesTest < ActiveRecord::TestCase end end + ActiveSupport::Notifications.unsubscribe(@connection_subscriber) + @connection_subscriber = nil + old_handler = ActiveRecord::Base.connection_handler ActiveRecord::Base.connection_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new ActiveRecord::Base.establish_connection(:arunit) diff --git a/railties/test/application/test_runner_test.rb b/railties/test/application/test_runner_test.rb index c36a4cb1473..25be6b2ba3d 100644 --- a/railties/test/application/test_runner_test.rb +++ b/railties/test/application/test_runner_test.rb @@ -898,7 +898,7 @@ module ApplicationTests end def test_run_in_parallel_with_threads - exercise_parallelization_regardless_of_machine_core_count(with: :threads) + exercise_parallelization_regardless_of_machine_core_count(with: :threads, transactional_fixtures: false) file_name = create_parallel_threads_test_file @@ -1390,7 +1390,7 @@ module ApplicationTests RUBY end - def exercise_parallelization_regardless_of_machine_core_count(with:, threshold: 0) + def exercise_parallelization_regardless_of_machine_core_count(with:, threshold: 0, transactional_fixtures: true) file_content = ERB.new(<<-ERB, trim_mode: "-").result_with_hash(with: with.to_s) ENV["RAILS_ENV"] ||= "test" require_relative "../config/environment" @@ -1399,6 +1399,7 @@ module ApplicationTests class ActiveSupport::TestCase # Run tests in parallel with specified workers parallelize(workers: 2, with: :<%= with %>, threshold: #{threshold}) + self.use_transactional_tests = #{transactional_fixtures} # Setup all fixtures in test/fixtures/*.yml for all tests in alphabetical order. fixtures :all