From 2a90104989b9f1653337d08871610d394b2d626d Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Mon, 8 Feb 2021 14:22:26 +0100 Subject: [PATCH] Implement Relation#load_async to schedule the query on the background thread pool This is built on previous async support added to Adapter#select_all. --- activerecord/CHANGELOG.md | 19 +++ .../asynchronous_queries_tracker.rb | 12 +- .../abstract/connection_pool.rb | 1 + .../abstract/database_statements.rb | 2 +- .../lib/active_record/future_result.rb | 6 + activerecord/lib/active_record/querying.rb | 9 +- activerecord/lib/active_record/relation.rb | 95 ++++++++++--- activerecord/lib/active_record/result.rb | 4 + activerecord/test/cases/adapter_test.rb | 12 +- .../test/cases/relation/load_async_test.rb | 130 ++++++++++++++++++ 10 files changed, 261 insertions(+), 29 deletions(-) create mode 100644 activerecord/test/cases/relation/load_async_test.rb diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 51b5b7a128c..5d9e094005a 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,3 +1,22 @@ +* Add `ActiveRecord::Relation#load_async`. + + This method schedules the query to be performed asynchronously from a thread pool. + + If the result is accessed before a background thread had the opportunity to perform + the query, it will be performed in the foreground. + + This is useful for queries that can be performed long enough before their result will be + needed, or for controllers which need to perform several independant queries. + + ```ruby + def index + @categories = Category.some_complex_scope.load_async + @posts = Post.some_complex_scope.load_async + end + ``` + + *Jean Boussier* + * Implemented `ActiveRecord::Relation#excluding` method. This method excludes the specified record (or collection of records) from diff --git a/activerecord/lib/active_record/asynchronous_queries_tracker.rb b/activerecord/lib/active_record/asynchronous_queries_tracker.rb index c36a4892e17..91bc155c8ce 100644 --- a/activerecord/lib/active_record/asynchronous_queries_tracker.rb +++ b/activerecord/lib/active_record/asynchronous_queries_tracker.rb @@ -2,6 +2,14 @@ module ActiveRecord class AsynchronousQueriesTracker # :nodoc: + module NullSession # :nodoc: + class << self + def active? + true + end + end + end + class Session # :nodoc: def initialize @active = true @@ -33,7 +41,7 @@ module ActiveRecord attr_reader :current_session def initialize - @current_session = nil + @current_session = NullSession end def start_session @@ -43,7 +51,7 @@ module ActiveRecord def finalize_session @current_session&.finalize - @current_session = nil + @current_session = NullSession end end end diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index a929c3d67b7..5670790844f 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -459,6 +459,7 @@ module ActiveRecord def schedule_query(future_result) # :nodoc: @async_executor.post { future_result.execute_or_skip } + Thread.pass end private diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index b3e1b4ab65c..14d6127bc5a 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -537,7 +537,7 @@ module ActiveRecord binds, prepare: prepare, ) - if supports_concurrent_connections? && current_transaction.closed? && ActiveRecord::Base.asynchronous_queries_session + if supports_concurrent_connections? && current_transaction.closed? future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session) else future_result.execute!(self) diff --git a/activerecord/lib/active_record/future_result.rb b/activerecord/lib/active_record/future_result.rb index f4a3681bc22..77d5624be3d 100644 --- a/activerecord/lib/active_record/future_result.rb +++ b/activerecord/lib/active_record/future_result.rb @@ -28,6 +28,12 @@ module ActiveRecord execute_query(connection) end + def cancel + @pending = false + @error = Canceled + self + end + def execute_or_skip return unless pending? diff --git a/activerecord/lib/active_record/querying.rb b/activerecord/lib/active_record/querying.rb index 8328a0cdb1a..46ee11ade84 100644 --- a/activerecord/lib/active_record/querying.rb +++ b/activerecord/lib/active_record/querying.rb @@ -44,7 +44,14 @@ module ActiveRecord # Post.find_by_sql ["SELECT title FROM posts WHERE author = ? AND created > ?", author_id, start_date] # Post.find_by_sql ["SELECT body FROM comments WHERE author = :user_id OR approved_by = :user_id", { :user_id => user_id }] def find_by_sql(sql, binds = [], preparable: nil, &block) - result_set = connection.select_all(sanitize_sql(sql), "#{name} Load", binds, preparable: preparable) + _load_from_sql(_query_by_sql(sql, binds, preparable: preparable), &block) + end + + def _query_by_sql(sql, binds = [], preparable: nil, async: false) # :nodoc: + connection.select_all(sanitize_sql(sql), "#{name} Load", binds, preparable: preparable, async: async) + end + + def _load_from_sql(result_set, &block) # :nodoc: column_types = result_set.column_types unless column_types.empty? diff --git a/activerecord/lib/active_record/relation.rb b/activerecord/lib/active_record/relation.rb index ff9e172c51a..e6ec2a3a3de 100644 --- a/activerecord/lib/active_record/relation.rb +++ b/activerecord/lib/active_record/relation.rb @@ -31,6 +31,7 @@ module ActiveRecord @loaded = false @predicate_builder = predicate_builder @delegate_to_klass = false + @future_result = nil end def initialize_copy(other) @@ -261,13 +262,20 @@ module ActiveRecord # Returns size of the records. def size - loaded? ? @records.length : count(:all) + if loaded? + records.length + else + count(:all) + end end # Returns true if there are no records. def empty? - return @records.empty? if loaded? - !exists? + if loaded? + records.empty? + else + !exists? + end end # Returns true if there are no records. @@ -642,6 +650,28 @@ module ActiveRecord where(*args).delete_all end + # Schedule the query to be performed from a background thread pool. + # + # Post.where(published: true).load_async # => # + def load_async + unless loaded? + result = exec_main_query(async: connection.current_transaction.closed?) + if result.is_a?(Array) + @records = result + else + @future_result = result + end + @loaded = true + end + self + end + + # Returns true if the relation was scheduled on the background + # thread pool. + def scheduled? + !!@future_result + end + # Causes the records to be loaded from the database if they have not # been loaded already. You can use this if for some reason you need # to explicitly load some records before actually using them. The @@ -649,7 +679,7 @@ module ActiveRecord # # Post.where(published: true).load # => # def load(&block) - unless loaded? + if !loaded? || scheduled? @records = exec_queries(&block) @loaded = true end @@ -664,6 +694,8 @@ module ActiveRecord end def reset + @future_result&.cancel + @future_result = nil @delegate_to_klass = false @to_sql = @arel = @loaded = @should_eager_load = nil @offsets = @take = nil @@ -855,23 +887,15 @@ module ActiveRecord def exec_queries(&block) skip_query_cache_if_necessary do - records = - if where_clause.contradiction? - [] - elsif eager_loading? - apply_join_dependency do |relation, join_dependency| - if relation.null_relation? - [] - else - relation = join_dependency.apply_column_aliases(relation) - rows = connection.select_all(relation.arel, "SQL") - join_dependency.instantiate(rows, strict_loading_value, &block) - end.freeze - end - else - klass.find_by_sql(arel, &block).freeze - end + rows = if scheduled? + future = @future_result + @future_result = nil + future.result + else + exec_main_query + end + records = instantiate_records(rows, &block) preload_associations(records) unless skip_preloading_value records.each(&:readonly!) if readonly_value @@ -881,6 +905,37 @@ module ActiveRecord end end + def exec_main_query(async: false) + skip_query_cache_if_necessary do + if where_clause.contradiction? + [].freeze + elsif eager_loading? + apply_join_dependency do |relation, join_dependency| + if relation.null_relation? + [].freeze + else + relation = join_dependency.apply_column_aliases(relation) + @_join_dependency = join_dependency + connection.select_all(relation.arel, "SQL", async: async) + end + end + else + klass._query_by_sql(arel, async: async) + end + end + end + + def instantiate_records(rows, &block) + return [].freeze if rows.empty? + if eager_loading? + records = @_join_dependency.instantiate(rows, strict_loading_value, &block).freeze + @_join_dependency = nil + records + else + klass._load_from_sql(rows, &block).freeze + end + end + def skip_query_cache_if_necessary if skip_query_cache_value uncached do diff --git a/activerecord/lib/active_record/result.rb b/activerecord/lib/active_record/result.rb index 34cc304cc21..c00665ad792 100644 --- a/activerecord/lib/active_record/result.rb +++ b/activerecord/lib/active_record/result.rb @@ -102,6 +102,10 @@ module ActiveRecord self end + def cancel # :nodoc: + self + end + def cast_values(type_overrides = {}) # :nodoc: if columns.one? # Separated to avoid allocating an array per row diff --git a/activerecord/test/cases/adapter_test.rb b/activerecord/test/cases/adapter_test.rb index ce50c4fba63..92f1d207af6 100644 --- a/activerecord/test/cases/adapter_test.rb +++ b/activerecord/test/cases/adapter_test.rb @@ -385,7 +385,7 @@ module ActiveRecord @connection.disable_query_cache! end - def test_async_query_outside_session + def test_async_query_foreground_fallback status = {} subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| @@ -395,10 +395,12 @@ module ActiveRecord end end - future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true - assert_kind_of ActiveRecord::FutureResult, future_result - assert_raises ActiveRecord::StatementInvalid do - future_result.result + @connection.pool.stub(:schedule_query, proc { }) do + future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true + assert_kind_of ActiveRecord::FutureResult, future_result + assert_raises ActiveRecord::StatementInvalid do + future_result.result + end end assert_equal true, status[:executed] diff --git a/activerecord/test/cases/relation/load_async_test.rb b/activerecord/test/cases/relation/load_async_test.rb new file mode 100644 index 00000000000..8f2a4df7424 --- /dev/null +++ b/activerecord/test/cases/relation/load_async_test.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +require "cases/helper" +require "models/post" +require "models/comment" + +module ActiveRecord + class LoadAsyncTest < ActiveRecord::TestCase + self.use_transactional_tests = false + + fixtures :posts, :comments + + def test_scheduled? + defered_posts = Post.where(author_id: 1).load_async + assert_predicate defered_posts, :scheduled? + assert_predicate defered_posts, :loaded? + defered_posts.to_a + assert_not_predicate defered_posts, :scheduled? + end + + def test_reset + defered_posts = Post.where(author_id: 1).load_async + assert_predicate defered_posts, :scheduled? + defered_posts.reset + assert_not_predicate defered_posts, :scheduled? + end + + def test_simple_query + expected_records = Post.where(author_id: 1).to_a + + status = {} + monitor = Monitor.new + condition = monitor.new_cond + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:name] == "Post Load" + status[:executed] = true + status[:async] = event.payload[:async] + monitor.synchronize { condition.signal } + end + end + + defered_posts = Post.where(author_id: 1).load_async + + monitor.synchronize do + condition.wait_until { status[:executed] } + end + + assert_equal expected_records, defered_posts.to_a + assert_equal Post.connection.supports_concurrent_connections?, status[:async] + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end + + def test_load_async_from_transaction + posts = nil + Post.transaction do + Post.where(author_id: 1).update_all(title: "In Transaction") + posts = Post.where(author_id: 1).load_async + assert_predicate posts, :scheduled? + assert_predicate posts, :loaded? + raise ActiveRecord::Rollback + end + + assert_not_nil posts + assert_equal ["In Transaction"], posts.map(&:title).uniq + end + + def test_eager_loading_query + expected_records = Post.where(author_id: 1).eager_load(:comments).to_a + + status = {} + monitor = Monitor.new + condition = monitor.new_cond + + subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event| + if event.payload[:name] == "SQL" + status[:executed] = true + status[:async] = event.payload[:async] + monitor.synchronize { condition.signal } + end + end + + defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async + + assert_predicate defered_posts, :scheduled? + + monitor.synchronize do + condition.wait_until { status[:executed] } + end + + assert_equal expected_records, defered_posts.to_a + assert_queries(0) do + defered_posts.each(&:comments) + end + assert_equal Post.connection.supports_concurrent_connections?, status[:async] + ensure + ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber + end + + def test_contradiction + assert_queries(0) do + assert_equal [], Post.where(id: []).load_async.to_a + end + + Post.where(id: []).load_async.reset + end + + def test_pluck + titles = Post.where(author_id: 1).pluck(:title) + assert_equal titles, Post.where(author_id: 1).load_async.pluck(:title) + end + + def test_size + expected_size = Post.where(author_id: 1).size + + defered_posts = Post.where(author_id: 1).load_async + + assert_equal expected_size, defered_posts.size + assert_predicate defered_posts, :loaded? + end + + def test_empty? + defered_posts = Post.where(author_id: 1).load_async + + assert_equal false, defered_posts.empty? + assert_predicate defered_posts, :loaded? + end + end +end