mirror of https://github.com/rails/rails
Merge pull request #41372 from Shopify/ar-relation-async-query
Implement Relation#load_async to schedule the query on the background thread pool
This commit is contained in:
commit
48effc7587
|
@ -1,3 +1,22 @@
|
|||
* Add `ActiveRecord::Relation#load_async`.
|
||||
|
||||
This method schedules the query to be performed asynchronously from a thread pool.
|
||||
|
||||
If the result is accessed before a background thread had the opportunity to perform
|
||||
the query, it will be performed in the foreground.
|
||||
|
||||
This is useful for queries that can be performed long enough before their result will be
|
||||
needed, or for controllers which need to perform several independant queries.
|
||||
|
||||
```ruby
|
||||
def index
|
||||
@categories = Category.some_complex_scope.load_async
|
||||
@posts = Post.some_complex_scope.load_async
|
||||
end
|
||||
```
|
||||
|
||||
*Jean Boussier*
|
||||
|
||||
* Implemented `ActiveRecord::Relation#excluding` method.
|
||||
|
||||
This method excludes the specified record (or collection of records) from
|
||||
|
|
|
@ -2,6 +2,14 @@
|
|||
|
||||
module ActiveRecord
|
||||
class AsynchronousQueriesTracker # :nodoc:
|
||||
module NullSession # :nodoc:
|
||||
class << self
|
||||
def active?
|
||||
true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class Session # :nodoc:
|
||||
def initialize
|
||||
@active = true
|
||||
|
@ -33,7 +41,7 @@ module ActiveRecord
|
|||
attr_reader :current_session
|
||||
|
||||
def initialize
|
||||
@current_session = nil
|
||||
@current_session = NullSession
|
||||
end
|
||||
|
||||
def start_session
|
||||
|
@ -43,7 +51,7 @@ module ActiveRecord
|
|||
|
||||
def finalize_session
|
||||
@current_session&.finalize
|
||||
@current_session = nil
|
||||
@current_session = NullSession
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -459,6 +459,7 @@ module ActiveRecord
|
|||
|
||||
def schedule_query(future_result) # :nodoc:
|
||||
@async_executor.post { future_result.execute_or_skip }
|
||||
Thread.pass
|
||||
end
|
||||
|
||||
private
|
||||
|
|
|
@ -537,7 +537,7 @@ module ActiveRecord
|
|||
binds,
|
||||
prepare: prepare,
|
||||
)
|
||||
if supports_concurrent_connections? && current_transaction.closed? && ActiveRecord::Base.asynchronous_queries_session
|
||||
if supports_concurrent_connections? && current_transaction.closed?
|
||||
future_result.schedule!(ActiveRecord::Base.asynchronous_queries_session)
|
||||
else
|
||||
future_result.execute!(self)
|
||||
|
|
|
@ -28,6 +28,12 @@ module ActiveRecord
|
|||
execute_query(connection)
|
||||
end
|
||||
|
||||
def cancel
|
||||
@pending = false
|
||||
@error = Canceled
|
||||
self
|
||||
end
|
||||
|
||||
def execute_or_skip
|
||||
return unless pending?
|
||||
|
||||
|
|
|
@ -44,7 +44,14 @@ module ActiveRecord
|
|||
# Post.find_by_sql ["SELECT title FROM posts WHERE author = ? AND created > ?", author_id, start_date]
|
||||
# Post.find_by_sql ["SELECT body FROM comments WHERE author = :user_id OR approved_by = :user_id", { :user_id => user_id }]
|
||||
def find_by_sql(sql, binds = [], preparable: nil, &block)
|
||||
result_set = connection.select_all(sanitize_sql(sql), "#{name} Load", binds, preparable: preparable)
|
||||
_load_from_sql(_query_by_sql(sql, binds, preparable: preparable), &block)
|
||||
end
|
||||
|
||||
def _query_by_sql(sql, binds = [], preparable: nil, async: false) # :nodoc:
|
||||
connection.select_all(sanitize_sql(sql), "#{name} Load", binds, preparable: preparable, async: async)
|
||||
end
|
||||
|
||||
def _load_from_sql(result_set, &block) # :nodoc:
|
||||
column_types = result_set.column_types
|
||||
|
||||
unless column_types.empty?
|
||||
|
|
|
@ -31,6 +31,7 @@ module ActiveRecord
|
|||
@loaded = false
|
||||
@predicate_builder = predicate_builder
|
||||
@delegate_to_klass = false
|
||||
@future_result = nil
|
||||
end
|
||||
|
||||
def initialize_copy(other)
|
||||
|
@ -261,13 +262,20 @@ module ActiveRecord
|
|||
|
||||
# Returns size of the records.
|
||||
def size
|
||||
loaded? ? @records.length : count(:all)
|
||||
if loaded?
|
||||
records.length
|
||||
else
|
||||
count(:all)
|
||||
end
|
||||
end
|
||||
|
||||
# Returns true if there are no records.
|
||||
def empty?
|
||||
return @records.empty? if loaded?
|
||||
!exists?
|
||||
if loaded?
|
||||
records.empty?
|
||||
else
|
||||
!exists?
|
||||
end
|
||||
end
|
||||
|
||||
# Returns true if there are no records.
|
||||
|
@ -642,6 +650,28 @@ module ActiveRecord
|
|||
where(*args).delete_all
|
||||
end
|
||||
|
||||
# Schedule the query to be performed from a background thread pool.
|
||||
#
|
||||
# Post.where(published: true).load_async # => #<ActiveRecord::Relation>
|
||||
def load_async
|
||||
unless loaded?
|
||||
result = exec_main_query(async: connection.current_transaction.closed?)
|
||||
if result.is_a?(Array)
|
||||
@records = result
|
||||
else
|
||||
@future_result = result
|
||||
end
|
||||
@loaded = true
|
||||
end
|
||||
self
|
||||
end
|
||||
|
||||
# Returns <tt>true</tt> if the relation was scheduled on the background
|
||||
# thread pool.
|
||||
def scheduled?
|
||||
!!@future_result
|
||||
end
|
||||
|
||||
# Causes the records to be loaded from the database if they have not
|
||||
# been loaded already. You can use this if for some reason you need
|
||||
# to explicitly load some records before actually using them. The
|
||||
|
@ -649,7 +679,7 @@ module ActiveRecord
|
|||
#
|
||||
# Post.where(published: true).load # => #<ActiveRecord::Relation>
|
||||
def load(&block)
|
||||
unless loaded?
|
||||
if !loaded? || scheduled?
|
||||
@records = exec_queries(&block)
|
||||
@loaded = true
|
||||
end
|
||||
|
@ -664,6 +694,8 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
def reset
|
||||
@future_result&.cancel
|
||||
@future_result = nil
|
||||
@delegate_to_klass = false
|
||||
@to_sql = @arel = @loaded = @should_eager_load = nil
|
||||
@offsets = @take = nil
|
||||
|
@ -855,23 +887,15 @@ module ActiveRecord
|
|||
|
||||
def exec_queries(&block)
|
||||
skip_query_cache_if_necessary do
|
||||
records =
|
||||
if where_clause.contradiction?
|
||||
[]
|
||||
elsif eager_loading?
|
||||
apply_join_dependency do |relation, join_dependency|
|
||||
if relation.null_relation?
|
||||
[]
|
||||
else
|
||||
relation = join_dependency.apply_column_aliases(relation)
|
||||
rows = connection.select_all(relation.arel, "SQL")
|
||||
join_dependency.instantiate(rows, strict_loading_value, &block)
|
||||
end.freeze
|
||||
end
|
||||
else
|
||||
klass.find_by_sql(arel, &block).freeze
|
||||
end
|
||||
rows = if scheduled?
|
||||
future = @future_result
|
||||
@future_result = nil
|
||||
future.result
|
||||
else
|
||||
exec_main_query
|
||||
end
|
||||
|
||||
records = instantiate_records(rows, &block)
|
||||
preload_associations(records) unless skip_preloading_value
|
||||
|
||||
records.each(&:readonly!) if readonly_value
|
||||
|
@ -881,6 +905,37 @@ module ActiveRecord
|
|||
end
|
||||
end
|
||||
|
||||
def exec_main_query(async: false)
|
||||
skip_query_cache_if_necessary do
|
||||
if where_clause.contradiction?
|
||||
[].freeze
|
||||
elsif eager_loading?
|
||||
apply_join_dependency do |relation, join_dependency|
|
||||
if relation.null_relation?
|
||||
[].freeze
|
||||
else
|
||||
relation = join_dependency.apply_column_aliases(relation)
|
||||
@_join_dependency = join_dependency
|
||||
connection.select_all(relation.arel, "SQL", async: async)
|
||||
end
|
||||
end
|
||||
else
|
||||
klass._query_by_sql(arel, async: async)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def instantiate_records(rows, &block)
|
||||
return [].freeze if rows.empty?
|
||||
if eager_loading?
|
||||
records = @_join_dependency.instantiate(rows, strict_loading_value, &block).freeze
|
||||
@_join_dependency = nil
|
||||
records
|
||||
else
|
||||
klass._load_from_sql(rows, &block).freeze
|
||||
end
|
||||
end
|
||||
|
||||
def skip_query_cache_if_necessary
|
||||
if skip_query_cache_value
|
||||
uncached do
|
||||
|
|
|
@ -102,6 +102,10 @@ module ActiveRecord
|
|||
self
|
||||
end
|
||||
|
||||
def cancel # :nodoc:
|
||||
self
|
||||
end
|
||||
|
||||
def cast_values(type_overrides = {}) # :nodoc:
|
||||
if columns.one?
|
||||
# Separated to avoid allocating an array per row
|
||||
|
|
|
@ -385,7 +385,7 @@ module ActiveRecord
|
|||
@connection.disable_query_cache!
|
||||
end
|
||||
|
||||
def test_async_query_outside_session
|
||||
def test_async_query_foreground_fallback
|
||||
status = {}
|
||||
|
||||
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
|
||||
|
@ -395,10 +395,12 @@ module ActiveRecord
|
|||
end
|
||||
end
|
||||
|
||||
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
|
||||
assert_kind_of ActiveRecord::FutureResult, future_result
|
||||
assert_raises ActiveRecord::StatementInvalid do
|
||||
future_result.result
|
||||
@connection.pool.stub(:schedule_query, proc { }) do
|
||||
future_result = @connection.select_all "SELECT * FROM does_not_exists", async: true
|
||||
assert_kind_of ActiveRecord::FutureResult, future_result
|
||||
assert_raises ActiveRecord::StatementInvalid do
|
||||
future_result.result
|
||||
end
|
||||
end
|
||||
|
||||
assert_equal true, status[:executed]
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require "cases/helper"
|
||||
require "models/post"
|
||||
require "models/comment"
|
||||
|
||||
module ActiveRecord
|
||||
class LoadAsyncTest < ActiveRecord::TestCase
|
||||
self.use_transactional_tests = false
|
||||
|
||||
fixtures :posts, :comments
|
||||
|
||||
def test_scheduled?
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
assert_predicate defered_posts, :loaded?
|
||||
defered_posts.to_a
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
end
|
||||
|
||||
def test_reset
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
defered_posts.reset
|
||||
assert_not_predicate defered_posts, :scheduled?
|
||||
end
|
||||
|
||||
def test_simple_query
|
||||
expected_records = Post.where(author_id: 1).to_a
|
||||
|
||||
status = {}
|
||||
monitor = Monitor.new
|
||||
condition = monitor.new_cond
|
||||
|
||||
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
|
||||
if event.payload[:name] == "Post Load"
|
||||
status[:executed] = true
|
||||
status[:async] = event.payload[:async]
|
||||
monitor.synchronize { condition.signal }
|
||||
end
|
||||
end
|
||||
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
|
||||
monitor.synchronize do
|
||||
condition.wait_until { status[:executed] }
|
||||
end
|
||||
|
||||
assert_equal expected_records, defered_posts.to_a
|
||||
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
|
||||
ensure
|
||||
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
|
||||
end
|
||||
|
||||
def test_load_async_from_transaction
|
||||
posts = nil
|
||||
Post.transaction do
|
||||
Post.where(author_id: 1).update_all(title: "In Transaction")
|
||||
posts = Post.where(author_id: 1).load_async
|
||||
assert_predicate posts, :scheduled?
|
||||
assert_predicate posts, :loaded?
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
|
||||
assert_not_nil posts
|
||||
assert_equal ["In Transaction"], posts.map(&:title).uniq
|
||||
end
|
||||
|
||||
def test_eager_loading_query
|
||||
expected_records = Post.where(author_id: 1).eager_load(:comments).to_a
|
||||
|
||||
status = {}
|
||||
monitor = Monitor.new
|
||||
condition = monitor.new_cond
|
||||
|
||||
subscriber = ActiveSupport::Notifications.subscribe("sql.active_record") do |event|
|
||||
if event.payload[:name] == "SQL"
|
||||
status[:executed] = true
|
||||
status[:async] = event.payload[:async]
|
||||
monitor.synchronize { condition.signal }
|
||||
end
|
||||
end
|
||||
|
||||
defered_posts = Post.where(author_id: 1).eager_load(:comments).load_async
|
||||
|
||||
assert_predicate defered_posts, :scheduled?
|
||||
|
||||
monitor.synchronize do
|
||||
condition.wait_until { status[:executed] }
|
||||
end
|
||||
|
||||
assert_equal expected_records, defered_posts.to_a
|
||||
assert_queries(0) do
|
||||
defered_posts.each(&:comments)
|
||||
end
|
||||
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
|
||||
ensure
|
||||
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
|
||||
end
|
||||
|
||||
def test_contradiction
|
||||
assert_queries(0) do
|
||||
assert_equal [], Post.where(id: []).load_async.to_a
|
||||
end
|
||||
|
||||
Post.where(id: []).load_async.reset
|
||||
end
|
||||
|
||||
def test_pluck
|
||||
titles = Post.where(author_id: 1).pluck(:title)
|
||||
assert_equal titles, Post.where(author_id: 1).load_async.pluck(:title)
|
||||
end
|
||||
|
||||
def test_size
|
||||
expected_size = Post.where(author_id: 1).size
|
||||
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
|
||||
assert_equal expected_size, defered_posts.size
|
||||
assert_predicate defered_posts, :loaded?
|
||||
end
|
||||
|
||||
def test_empty?
|
||||
defered_posts = Post.where(author_id: 1).load_async
|
||||
|
||||
assert_equal false, defered_posts.empty?
|
||||
assert_predicate defered_posts, :loaded?
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue