Add new tests for deferred connection verification and auto-reconnect

This commit is contained in:
Matthew Draper 2022-03-23 22:52:04 +10:30
parent 6693e5fc9a
commit 57bc28f728
14 changed files with 304 additions and 165 deletions

View File

@ -389,6 +389,8 @@ module ActiveRecord
# done if the transaction block raises an exception or returns false.
def rollback_db_transaction
exec_rollback_db_transaction
rescue ActiveRecord::ConnectionNotEstablished, ActiveRecord::ConnectionFailed
reconnect!
end
def exec_rollback_db_transaction() end # :nodoc:
@ -478,6 +480,10 @@ module ActiveRecord
end
private
def internal_execute(sql, name = "SCHEMA")
execute(sql, name)
end
def execute_batch(statements, name = nil)
statements.each do |statement|
execute(statement, name)

View File

@ -8,15 +8,15 @@ module ActiveRecord
end
def create_savepoint(name = current_savepoint_name)
execute("SAVEPOINT #{name}", "TRANSACTION")
internal_execute("SAVEPOINT #{name}", "TRANSACTION")
end
def exec_rollback_to_savepoint(name = current_savepoint_name)
execute("ROLLBACK TO SAVEPOINT #{name}", "TRANSACTION")
internal_execute("ROLLBACK TO SAVEPOINT #{name}", "TRANSACTION")
end
def release_savepoint(name = current_savepoint_name)
execute("RELEASE SAVEPOINT #{name}", "TRANSACTION")
internal_execute("RELEASE SAVEPOINT #{name}", "TRANSACTION")
end
end
end

View File

@ -403,24 +403,24 @@ module ActiveRecord
def materialize_transactions
return if @materializing_transactions
if @has_unmaterialized_transactions
@connection.lock.synchronize do
begin
@materializing_transactions = true
@stack.each { |t| t.materialize! unless t.materialized? }
ensure
@materializing_transactions = false
end
@has_unmaterialized_transactions = false
end
end
# As a logical simplification for now, we assume anything that requests
# materialization is about to dirty the transaction. Note this is just
# an assumption about the caller, not a direct property of this method.
# It can go away later when callers are able to handle dirtiness for
# themselves.
dirty_current_transaction
return unless @has_unmaterialized_transactions
@connection.lock.synchronize do
begin
@materializing_transactions = true
@stack.each { |t| t.materialize! unless t.materialized? }
ensure
@materializing_transactions = false
end
@has_unmaterialized_transactions = false
end
end
def commit_transaction

View File

@ -917,7 +917,7 @@ module ActiveRecord
end
def retryable_connection_error?(exception)
exception.is_a?(ConnectionNotEstablished)
exception.is_a?(ConnectionNotEstablished) || exception.is_a?(ConnectionFailed)
end
def retryable_query_error?(exception)

View File

@ -224,15 +224,15 @@ module ActiveRecord
end
def commit_db_transaction # :nodoc:
internal_execute("COMMIT", "TRANSACTION")
internal_execute("COMMIT", "TRANSACTION", allow_retry: false, uses_transaction: true)
end
def exec_rollback_db_transaction # :nodoc:
internal_execute("ROLLBACK", "TRANSACTION")
internal_execute("ROLLBACK", "TRANSACTION", allow_retry: false, uses_transaction: true)
end
def exec_restart_db_transaction # :nodoc:
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION")
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION", allow_retry: false, uses_transaction: true)
end
def empty_insert_statement_value(primary_key = nil) # :nodoc:
@ -679,8 +679,12 @@ module ActiveRecord
ER_CANNOT_CREATE_TABLE = 1005
ER_LOCK_WAIT_TIMEOUT = 1205
ER_QUERY_INTERRUPTED = 1317
ER_CONNECTION_KILLED = 1927
CR_SERVER_GONE_ERROR = 2006
CR_SERVER_LOST = 2013
ER_QUERY_TIMEOUT = 3024
ER_FK_INCOMPATIBLE_COLUMNS = 3780
ER_CLIENT_INTERACTION_TIMEOUT = 4031
def translate_exception(exception, message:, sql:, binds:)
case error_number(exception)
@ -690,6 +694,8 @@ module ActiveRecord
else
super
end
when ER_CONNECTION_KILLED, CR_SERVER_GONE_ERROR, CR_SERVER_LOST, ER_CLIENT_INTERACTION_TIMEOUT
ConnectionFailed.new(message, sql: sql, binds: binds)
when ER_DB_CREATE_EXISTS
DatabaseAlreadyExists.new(message, sql: sql, binds: binds)
when ER_DUP_ENTRY

View File

@ -173,6 +173,12 @@ module ActiveRecord
def translate_exception(exception, message:, sql:, binds:)
if exception.is_a?(Mysql2::Error::TimeoutError) && !exception.error_number
ActiveRecord::AdapterTimeout.new(message, sql: sql, binds: binds)
elsif exception.is_a?(Mysql2::Error::ConnectionError)
if exception.message.match?(/MySQL client is not connected/i)
ActiveRecord::ConnectionNotEstablished.new(exception)
else
ActiveRecord::ConnectionFailed.new(message, sql: sql, binds: binds)
end
else
super
end

View File

@ -125,24 +125,18 @@ module ActiveRecord
# Commits a transaction.
def commit_db_transaction # :nodoc:
internal_execute("COMMIT", "TRANSACTION")
internal_execute("COMMIT", "TRANSACTION", allow_retry: false, uses_transaction: true)
end
# Aborts a transaction.
def exec_rollback_db_transaction # :nodoc:
if @raw_connection
@raw_connection.cancel unless @raw_connection.transaction_status == PG::PQTRANS_IDLE
@raw_connection.block
end
internal_execute("ROLLBACK", "TRANSACTION")
cancel_any_running_query
internal_execute("ROLLBACK", "TRANSACTION", allow_retry: false, uses_transaction: true)
end
def exec_restart_db_transaction # :nodoc:
if @raw_connection
@raw_connection.cancel unless @raw_connection.transaction_status == PG::PQTRANS_IDLE
@raw_connection.block
end
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION")
cancel_any_running_query
internal_execute("ROLLBACK AND CHAIN", "TRANSACTION", allow_retry: false, uses_transaction: true)
end
# From https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
@ -154,6 +148,13 @@ module ActiveRecord
end
private
def cancel_any_running_query
return unless @raw_connection && @raw_connection.transaction_status != PG::PQTRANS_IDLE
@raw_connection.cancel
@raw_connection.block
rescue PG::Error
end
def execute_batch(statements, name = nil)
execute(combine_multi_statements(statements))
end

View File

@ -680,8 +680,17 @@ module ActiveRecord
when nil
if exception.message.match?(/connection is closed/i)
ConnectionNotEstablished.new(exception)
elsif exception.is_a?(PG::ConnectionBad) && !exception.message.end_with?("\n")
ConnectionNotEstablished.new(exception)
elsif exception.is_a?(PG::ConnectionBad)
# libpq message style always ends with a newline; the pg gem's internal
# errors do not. We separate these cases because a pg-internal
# ConnectionBad means it failed before it managed to send the query,
# whereas a libpq failure could have occurred at any time (meaning the
# server may have already executed part or all of the query).
if exception.message.end_with?("\n")
ConnectionFailed.new(exception)
else
ConnectionNotEstablished.new(exception)
end
else
super
end

View File

@ -493,6 +493,11 @@ module ActiveRecord
class AdapterTimeout < QueryAborted
end
# ConnectionFailed will be raised when the network connection to the
# database fails while sending a query or waiting for its result.
class ConnectionFailed < QueryAborted
end
# UnknownAttributeReference is raised when an unknown and potentially unsafe
# value is passed to a query method. For example, passing a non column name
# value to a relation's #order method might cause this exception.

View File

@ -376,91 +376,6 @@ module ActiveRecord
end
unless in_memory_db?
test "reconnect after a disconnect" do
assert_predicate @connection, :active?
@connection.disconnect!
assert_not_predicate @connection, :active?
@connection.reconnect!
assert_predicate @connection, :active?
end
test "materialized transaction state is reset after a reconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
@connection.materialize_transactions
assert raw_transaction_open?(@connection)
@connection.reconnect!
assert_not_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
end
test "materialized transaction state can be restored after a reconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
# +materialize_transactions+ currently automatically dirties the
# connection, which would make it unrestorable
@connection.transaction_manager.stub(:dirty_current_transaction, nil) do
@connection.materialize_transactions
end
assert raw_transaction_open?(@connection)
@connection.reconnect!(restore_transactions: true)
assert_predicate @connection, :transaction_open?
assert raw_transaction_open?(@connection)
ensure
@connection.reconnect!
assert_not_predicate @connection, :transaction_open?
end
test "materialized transaction state is reset after a disconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
@connection.materialize_transactions
assert raw_transaction_open?(@connection)
@connection.disconnect!
assert_not_predicate @connection, :transaction_open?
ensure
@connection.reconnect!
assert_not raw_transaction_open?(@connection)
end
test "unmaterialized transaction state is reset after a reconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.reconnect!
assert_not_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.materialize_transactions
assert_not raw_transaction_open?(@connection)
end
test "unmaterialized transaction state can be restored after a reconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.reconnect!(restore_transactions: true)
assert_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.materialize_transactions
assert raw_transaction_open?(@connection)
ensure
@connection.reconnect!
assert_not_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
end
test "unmaterialized transaction state is reset after a disconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.disconnect!
assert_not_predicate @connection, :transaction_open?
ensure
@connection.reconnect!
assert_not raw_transaction_open?(@connection)
@connection.materialize_transactions
assert_not raw_transaction_open?(@connection)
end
end
def test_create_with_query_cache
@ -551,31 +466,6 @@ module ActiveRecord
end
private
def raw_transaction_open?(connection)
case connection.class::ADAPTER_NAME
when "PostgreSQL"
connection.instance_variable_get(:@raw_connection).transaction_status == ::PG::PQTRANS_INTRANS
when "Mysql2"
begin
connection.instance_variable_get(:@raw_connection).query("SAVEPOINT transaction_test")
connection.instance_variable_get(:@raw_connection).query("RELEASE SAVEPOINT transaction_test")
true
rescue
false
end
when "SQLite"
begin
connection.instance_variable_get(:@raw_connection).transaction { nil }
false
rescue
true
end
else
skip
end
end
def reset_fixtures(*fixture_names)
ActiveRecord::FixtureSet.reset_cache
@ -584,6 +474,225 @@ module ActiveRecord
end
end
end
unless in_memory_db?
class AdapterConnectionTest < ActiveRecord::TestCase
self.use_transactional_tests = false
fixtures :posts, :authors, :author_addresses
def setup
@connection = ActiveRecord::Base.connection
assert_predicate @connection, :active?
end
def teardown
@connection.reconnect!
assert_predicate @connection, :active?
assert_not_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
end
test "reconnect after a disconnect" do
@connection.disconnect!
assert_not_predicate @connection, :active?
@connection.reconnect!
assert_predicate @connection, :active?
end
test "materialized transaction state is reset after a reconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
@connection.materialize_transactions
assert raw_transaction_open?(@connection)
@connection.reconnect!
assert_not_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
end
test "materialized transaction state can be restored after a reconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
# +materialize_transactions+ currently automatically dirties the
# connection, which would make it unrestorable
@connection.transaction_manager.stub(:dirty_current_transaction, nil) do
@connection.materialize_transactions
end
assert raw_transaction_open?(@connection)
@connection.reconnect!(restore_transactions: true)
assert_predicate @connection, :transaction_open?
assert raw_transaction_open?(@connection)
end
test "materialized transaction state is reset after a disconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
@connection.materialize_transactions
assert raw_transaction_open?(@connection)
@connection.disconnect!
assert_not_predicate @connection, :transaction_open?
end
test "unmaterialized transaction state is reset after a reconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.reconnect!
assert_not_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.materialize_transactions
assert_not raw_transaction_open?(@connection)
end
test "unmaterialized transaction state can be restored after a reconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.reconnect!(restore_transactions: true)
assert_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.materialize_transactions
assert raw_transaction_open?(@connection)
end
test "unmaterialized transaction state is reset after a disconnect" do
@connection.begin_transaction
assert_predicate @connection, :transaction_open?
assert_not raw_transaction_open?(@connection)
@connection.disconnect!
assert_not_predicate @connection, :transaction_open?
end
test "active? detects remote disconnection" do
remote_disconnect @connection
assert_not_predicate @connection, :active?
end
test "verify! restores after remote disconnection" do
remote_disconnect @connection
@connection.verify!
assert_predicate @connection, :active?
end
test "reconnect! restores after remote disconnection" do
remote_disconnect @connection
@connection.reconnect!
assert_predicate @connection, :active?
end
test "querying a 'clean' failed connection restores and succeeds" do
remote_disconnect @connection
@connection.clean! # this simulates a fresh checkout from the pool
# Clean did not verify / fix the connection
assert_not_predicate @connection, :active?
# Because the connection hasn't been verified since checkout,
# and the query cannot safely be retried, the connection will be
# verified before querying.
Post.delete_all
assert_predicate @connection, :active?
end
test "transaction restores after remote disconnection" do
remote_disconnect @connection
Post.transaction do
Post.count
end
assert_predicate @connection, :active?
end
test "active transaction is restored after remote disconnection" do
assert_operator Post.count, :>, 0
Post.transaction do
# +materialize_transactions+ currently automatically dirties the
# connection, which would make it unrestorable
@connection.transaction_manager.stub(:dirty_current_transaction, nil) do
@connection.materialize_transactions
end
remote_disconnect @connection
# Regular queries are not retryable, so the only abstract operation we can
# perform here is a direct verify. The outer transaction means using another
# here would just be a ResetParent.
@connection.verify!
Post.delete_all
assert_equal 0, Post.count
raise ActiveRecord::Rollback
end
# The deletion occurred within the outer transaction (which was then rolled
# back), and not directly on the freshly-reestablished connection, so the
# posts are still there:
assert_operator Post.count, :>, 0
end
test "dirty transaction cannot be restored after remote disconnection" do
invocations = 0
assert_raises ActiveRecord::ConnectionFailed do
Post.transaction do
invocations += 1
Post.delete_all
remote_disconnect @connection
Post.count
end
end
assert_equal 1, invocations # the whole transaction block is not retried
# After the (outermost) transaction block failed, it reconnected
assert_predicate @connection, :active?
assert_operator Post.count, :>, 0
end
private
def raw_transaction_open?(connection)
case connection.class::ADAPTER_NAME
when "PostgreSQL"
connection.instance_variable_get(:@raw_connection).transaction_status == ::PG::PQTRANS_INTRANS
when "Mysql2"
begin
connection.instance_variable_get(:@raw_connection).query("SAVEPOINT transaction_test")
connection.instance_variable_get(:@raw_connection).query("RELEASE SAVEPOINT transaction_test")
true
rescue
false
end
when "SQLite"
begin
connection.instance_variable_get(:@raw_connection).transaction { nil }
false
rescue
true
end
else
skip
end
end
def remote_disconnect(connection)
case connection.class::ADAPTER_NAME
when "PostgreSQL"
unless connection.instance_variable_get(:@raw_connection).transaction_status == ::PG::PQTRANS_INTRANS
connection.instance_variable_get(:@raw_connection).async_exec("begin")
end
connection.instance_variable_get(:@raw_connection).async_exec("set idle_in_transaction_session_timeout = '10ms'")
sleep 0.05
when "Mysql2"
connection.send(:internal_execute, "set @@wait_timeout=1")
sleep 1.2
else
skip
end
end
end
end
end
if ActiveRecord::Base.connection.supports_advisory_locks?

View File

@ -31,8 +31,7 @@ class Mysql2ConnectionTest < ActiveRecord::Mysql2TestCase
def test_no_automatic_reconnection_after_timeout
assert_predicate @connection, :active?
@connection.update("set @@wait_timeout=1")
sleep 2
cause_server_side_disconnect
assert_not_predicate @connection, :active?
ensure
# Repair all fixture connections so other tests won't break.
@ -41,16 +40,14 @@ class Mysql2ConnectionTest < ActiveRecord::Mysql2TestCase
def test_successful_reconnection_after_timeout_with_manual_reconnect
assert_predicate @connection, :active?
@connection.update("set @@wait_timeout=1")
sleep 2
cause_server_side_disconnect
@connection.reconnect!
assert_predicate @connection, :active?
end
def test_successful_reconnection_after_timeout_with_verify
assert_predicate @connection, :active?
@connection.update("set @@wait_timeout=1")
sleep 2
cause_server_side_disconnect
@connection.verify!
assert_predicate @connection, :active?
end
@ -221,6 +218,11 @@ class Mysql2ConnectionTest < ActiveRecord::Mysql2TestCase
end
private
def cause_server_side_disconnect
@connection.update("set @@wait_timeout=1")
sleep 2
end
def test_lock_free(lock_name)
@connection.select_value("SELECT IS_FREE_LOCK(#{@connection.quote(lock_name)})") == 1
end

View File

@ -139,26 +139,10 @@ module ActiveRecord
end
def test_reconnection_after_actual_disconnection_with_verify
original_connection_pid = @connection.query("select pg_backend_pid()")
# Double check we are connected to begin with
assert_predicate @connection, :active?
secondary_connection = ActiveRecord::Base.connection_pool.checkout
secondary_connection.query("select pg_terminate_backend(#{original_connection_pid.first.first})")
ActiveRecord::Base.connection_pool.checkin(secondary_connection)
cause_server_side_disconnect
@connection.verify!
assert_predicate @connection, :active?
# If we get no exception here, then either we re-connected successfully, or
# we never actually got disconnected.
new_connection_pid = @connection.query("select pg_backend_pid()")
assert_not_equal original_connection_pid, new_connection_pid,
"umm -- looks like you didn't break the connection, because we're still " \
"successfully querying with the same connection pid."
ensure
# Repair all fixture connections so other tests won't break.
@fixture_connections.each(&:verify!)
@ -235,6 +219,14 @@ module ActiveRecord
end
private
def cause_server_side_disconnect
unless @connection.instance_variable_get(:@raw_connection).transaction_status == ::PG::PQTRANS_INTRANS
@connection.execute("begin")
end
@connection.execute("set idle_in_transaction_session_timeout = '10ms'")
sleep 0.05
end
def with_warning_suppression
log_level = @connection.client_min_messages
@connection.client_min_messages = "error"

View File

@ -15,6 +15,7 @@ class TestDisconnectedAdapter < ActiveRecord::TestCase
teardown do
return if in_memory_db?
db_config = ActiveRecord::Base.connection_db_config
ActiveRecord::Base.remove_connection
ActiveRecord::Base.establish_connection(db_config)
end

View File

@ -739,6 +739,8 @@ class TransactionTest < ActiveRecord::TestCase
def test_releasing_named_savepoints
Topic.transaction do
Topic.connection.materialize_transactions
Topic.connection.create_savepoint("another")
Topic.connection.release_savepoint("another")