Support batching using custom columns

This commit is contained in:
fatkodima 2024-07-13 17:19:06 +03:00
parent 7c1db0dfab
commit fd52a754f2
6 changed files with 241 additions and 87 deletions

View File

@ -1,3 +1,13 @@
* Support batching using custom columns.
```ruby
Product.in_batches(cursor: [:shop_id, :id]) do |relation|
# do something with relation
end
```
*fatkodima*
* Use SQLite `IMMEDIATE` transactions when possible.
Transactions run against the SQLite3 adapter default to IMMEDIATE mode to improve concurrency support and avoid busy exceptions.

View File

@ -5,7 +5,7 @@ require "active_record/relation/batches/batch_enumerator"
module ActiveRecord
# = Active Record \Batches
module Batches
ORDER_IGNORE_MESSAGE = "Scoped order is ignored, it's forced to be batch order."
ORDER_IGNORE_MESSAGE = "Scoped order is ignored, use :cursor with :order to configure custom order."
DEFAULT_ORDER = :asc
# Looping through a collection of records from the database
@ -35,11 +35,13 @@ module ActiveRecord
#
# ==== Options
# * <tt>:batch_size</tt> - Specifies the size of the batch. Defaults to 1000.
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:start</tt> - Specifies the cursor column value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the cursor column value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
# * <tt>:cursor</tt> - Specifies the column to use for batching (can be a column name or an array
# of column names). Defaults to primary key.
# * <tt>:order</tt> - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
@ -71,20 +73,25 @@ module ActiveRecord
#
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
# ascending on the primary key ("id ASC").
# This also means that this method only works when the primary key is
# This also means that this method only works when the cursor column is
# orderable (e.g. an integer or string).
#
# NOTE: When using custom columns for batching, they should include at least one unique column
# (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
# all columns should be static (unchangeable after it was set).
#
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER, &block)
def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER, &block)
if block_given?
find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do |records|
find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do |records|
records.each(&block)
end
else
enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do
enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do
relation = self
apply_limits(relation, start, finish, build_batch_orders(order)).size
cursor = Array(cursor)
apply_limits(relation, cursor, start, finish, build_batch_orders(cursor, order)).size
end
end
end
@ -109,11 +116,13 @@ module ActiveRecord
#
# ==== Options
# * <tt>:batch_size</tt> - Specifies the size of the batch. Defaults to 1000.
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:start</tt> - Specifies the cursor column value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the cursor column value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
# * <tt>:cursor</tt> - Specifies the column to use for batching (can be a column name or an array
# of column names). Defaults to primary key.
# * <tt>:order</tt> - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
@ -140,21 +149,26 @@ module ActiveRecord
#
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
# ascending on the primary key ("id ASC").
# This also means that this method only works when the primary key is
# This also means that this method only works when the cursor column is
# orderable (e.g. an integer or string).
#
# NOTE: When using custom columns for batching, they should include at least one unique column
# (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
# all columns should be static (unchangeable after it was set).
#
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER)
def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER)
relation = self
unless block_given?
return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do
total = apply_limits(relation, start, finish, build_batch_orders(order)).size
return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do
cursor = Array(cursor)
total = apply_limits(relation, cursor, start, finish, build_batch_orders(cursor, order)).size
(total - 1).div(batch_size) + 1
end
end
in_batches(of: batch_size, start: start, finish: finish, load: true, error_on_ignore: error_on_ignore, order: order) do |batch|
in_batches(of: batch_size, start: start, finish: finish, load: true, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do |batch|
yield batch.to_a
end
end
@ -183,11 +197,13 @@ module ActiveRecord
# ==== Options
# * <tt>:of</tt> - Specifies the size of the batch. Defaults to 1000.
# * <tt>:load</tt> - Specifies if the relation should be loaded. Defaults to false.
# * <tt>:start</tt> - Specifies the primary key value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the primary key value to end at, inclusive of the value.
# * <tt>:start</tt> - Specifies the cursor column value to start from, inclusive of the value.
# * <tt>:finish</tt> - Specifies the cursor column value to end at, inclusive of the value.
# * <tt>:error_on_ignore</tt> - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
# * <tt>:order</tt> - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
# * <tt>:cursor</tt> - Specifies the column to use for batching (can be a column name or an array
# of column names). Defaults to primary key.
# * <tt>:order</tt> - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
@ -231,18 +247,21 @@ module ActiveRecord
#
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
# ascending on the primary key ("id ASC").
# This also means that this method only works when the primary key is
# This also means that this method only works when the cursor column is
# orderable (e.g. an integer or string).
#
# NOTE: When using custom columns for batching, they should include at least one unique column
# (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
# all columns should be static (unchangeable after it was set).
#
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, order: DEFAULT_ORDER, use_ranges: nil, &block)
unless Array(order).all? { |ord| [:asc, :desc].include?(ord) }
raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}"
end
def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER, use_ranges: nil, &block)
cursor = Array(cursor).map(&:to_s)
ensure_valid_options_for_batching!(cursor, start, finish, order)
unless block
return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self, order: order, use_ranges: use_ranges)
return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self, cursor: cursor, order: order, use_ranges: use_ranges)
end
if arel.orders.present?
@ -261,6 +280,7 @@ module ActiveRecord
relation: self,
start: start,
finish: finish,
cursor: cursor,
order: order,
batch_limit: batch_limit,
&block
@ -271,6 +291,7 @@ module ActiveRecord
start: start,
finish: finish,
load: load,
cursor: cursor,
order: order,
use_ranges: use_ranges,
remaining: remaining,
@ -281,28 +302,51 @@ module ActiveRecord
end
private
def apply_limits(relation, start, finish, batch_orders)
relation = apply_start_limit(relation, start, batch_orders) if start
relation = apply_finish_limit(relation, finish, batch_orders) if finish
def ensure_valid_options_for_batching!(cursor, start, finish, order)
if start && Array(start).size != cursor.size
raise ArgumentError, ":start must contain one value per cursor column"
end
if finish && Array(finish).size != cursor.size
raise ArgumentError, ":finish must contain one value per cursor column"
end
if (Array(primary_key) - cursor).any?
indexes = model.schema_cache.indexes(table_name)
unique_index = indexes.find { |index| index.unique && index.where.nil? && (Array(index.columns) - cursor).empty? }
unless unique_index
raise ArgumentError, ":cursor must include a primary key or other unique column(s)"
end
end
if (Array(order) - [:asc, :desc]).any?
raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}"
end
end
def apply_limits(relation, cursor, start, finish, batch_orders)
relation = apply_start_limit(relation, cursor, start, batch_orders) if start
relation = apply_finish_limit(relation, cursor, finish, batch_orders) if finish
relation
end
def apply_start_limit(relation, start, batch_orders)
def apply_start_limit(relation, cursor, start, batch_orders)
operators = batch_orders.map do |_column, order|
order == :desc ? :lteq : :gteq
end
batch_condition(relation, primary_key, start, operators)
batch_condition(relation, cursor, start, operators)
end
def apply_finish_limit(relation, finish, batch_orders)
def apply_finish_limit(relation, cursor, finish, batch_orders)
operators = batch_orders.map do |_column, order|
order == :desc ? :gteq : :lteq
end
batch_condition(relation, primary_key, finish, operators)
batch_condition(relation, cursor, finish, operators)
end
def batch_condition(relation, columns, values, operators)
cursor_positions = Array(columns).zip(Array(values), operators)
def batch_condition(relation, cursor, values, operators)
cursor_positions = cursor.zip(Array(values), operators)
first_clause_column, first_clause_value, operator = cursor_positions.pop
where_clause = predicate_builder[first_clause_column, first_clause_value, operator]
@ -316,9 +360,9 @@ module ActiveRecord
relation.where(where_clause)
end
def build_batch_orders(order)
get_the_order_of_primary_key(order).map do |column, ord|
[column, ord || DEFAULT_ORDER]
def build_batch_orders(cursor, order)
cursor.zip(Array(order)).map do |column, order_|
[column, order_ || DEFAULT_ORDER]
end
end
@ -332,23 +376,23 @@ module ActiveRecord
end
end
def get_the_order_of_primary_key(order)
Array(primary_key).zip(Array(order))
end
def batch_on_loaded_relation(relation:, start:, finish:, order:, batch_limit:)
def batch_on_loaded_relation(relation:, start:, finish:, cursor:, order:, batch_limit:)
records = relation.to_a
order = build_batch_orders(order).map(&:second)
order = build_batch_orders(cursor, order).map(&:second)
if start || finish
records = records.filter do |record|
(start.nil? || compare_values_for_order(record.id, start, order) >= 0) &&
(finish.nil? || compare_values_for_order(record.id, finish, order) <= 0)
values = record_cursor_values(record, cursor)
(start.nil? || compare_values_for_order(values, Array(start), order) >= 0) &&
(finish.nil? || compare_values_for_order(values, Array(finish), order) <= 0)
end
end
records.sort! do |record1, record2|
compare_values_for_order(record1.id, record2.id, order)
values1 = record_cursor_values(record1, cursor)
values2 = record_cursor_values(record2, cursor)
compare_values_for_order(values1, values2, order)
end
records.each_slice(batch_limit) do |subrecords|
@ -361,32 +405,28 @@ module ActiveRecord
nil
end
# This is a custom implementation of `<=>` operator,
# which also takes into account how the collection will be ordered.
def compare_values_for_order(value1, value2, order)
# Multiple column values.
if value1.is_a?(Array)
value1.each_with_index do |element1, index|
element2 = value2[index]
direction = order[index]
comparison = element1 <=> element2
comparison = -comparison if direction == :desc
return comparison if comparison != 0
end
0
# Single column values.
elsif order.first == :asc
value1 <=> value2
else
value2 <=> value1
end
def record_cursor_values(record, cursor)
record.attributes.slice(*cursor).values
end
def batch_on_unloaded_relation(relation:, start:, finish:, load:, order:, use_ranges:, remaining:, batch_limit:)
batch_orders = build_batch_orders(order)
# This is a custom implementation of `<=>` operator,
# which also takes into account how the collection will be ordered.
def compare_values_for_order(values1, values2, order)
values1.each_with_index do |element1, index|
element2 = values2[index]
direction = order[index]
comparison = element1 <=> element2
comparison = -comparison if direction == :desc
return comparison if comparison != 0
end
0
end
def batch_on_unloaded_relation(relation:, start:, finish:, load:, cursor:, order:, use_ranges:, remaining:, batch_limit:)
batch_orders = build_batch_orders(cursor, order)
relation = relation.reorder(batch_orders.to_h).limit(batch_limit)
relation = apply_limits(relation, start, finish, batch_orders)
relation = apply_limits(relation, cursor, start, finish, batch_orders)
relation.skip_query_cache! # Retaining the results in the query cache would undermine the point of batching
batch_relation = relation
empty_scope = to_sql == model.unscoped.all.to_sql
@ -394,33 +434,36 @@ module ActiveRecord
loop do
if load
records = batch_relation.records
ids = records.map(&:id)
yielded_relation = where(primary_key => ids)
values = records.pluck(*cursor)
yielded_relation = where(cursor => values)
yielded_relation.load_records(records)
elsif (empty_scope && use_ranges != false) || use_ranges
ids = batch_relation.ids
finish = ids.last
values = batch_relation.pluck(*cursor)
finish = values.last
if finish
yielded_relation = apply_finish_limit(batch_relation, finish, batch_orders)
yielded_relation = apply_finish_limit(batch_relation, cursor, finish, batch_orders)
yielded_relation = yielded_relation.except(:limit, :order)
yielded_relation.skip_query_cache!(false)
end
else
ids = batch_relation.ids
yielded_relation = where(primary_key => ids)
values = batch_relation.pluck(*cursor)
yielded_relation = where(cursor => values)
end
break if ids.empty?
break if values.empty?
primary_key_offset = ids.last
raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset
if values.flatten.any?(nil)
raise ArgumentError, "Not all of the batch cursor columns were included in the custom select clause "\
"or some columns contain nil."
end
yield yielded_relation
break if ids.length < batch_limit
break if values.length < batch_limit
if limit_value
remaining -= ids.length
remaining -= values.length
if remaining == 0
# Saves a useless iteration when the limit is a multiple of the
@ -438,7 +481,8 @@ module ActiveRecord
end
operators << (last_order == :desc ? :lt : :gt)
batch_relation = batch_condition(relation, primary_key, primary_key_offset, operators)
cursor_value = values.last
batch_relation = batch_condition(relation, cursor, cursor_value, operators)
end
nil

View File

@ -5,11 +5,12 @@ module ActiveRecord
class BatchEnumerator
include Enumerable
def initialize(of: 1000, start: nil, finish: nil, relation:, order: :asc, use_ranges: nil) # :nodoc:
def initialize(of: 1000, start: nil, finish: nil, relation:, cursor:, order: :asc, use_ranges: nil) # :nodoc:
@of = of
@relation = relation
@start = start
@finish = finish
@cursor = cursor
@order = order
@use_ranges = use_ranges
end
@ -52,7 +53,7 @@ module ActiveRecord
def each_record(&block)
return to_enum(:each_record) unless block_given?
@relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: true, order: @order).each do |relation|
@relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: true, cursor: @cursor, order: @order).each do |relation|
relation.records.each(&block)
end
end
@ -105,7 +106,7 @@ module ActiveRecord
# relation.update_all(awesome: true)
# end
def each(&block)
enum = @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: false, order: @order, use_ranges: @use_ranges)
enum = @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: false, cursor: @cursor, order: @order, use_ranges: @use_ranges)
return enum.each(&block) if block_given?
enum
end

View File

@ -77,6 +77,11 @@ module ActiveRecord
return ["1=0"] if attributes.empty?
attributes.flat_map do |key, value|
if key.is_a?(Array) && key.size == 1
key = key.first
value = value.flatten
end
if key.is_a?(Array)
queries = Array(value).map do |ids_set|
raise ArgumentError, "Expected corresponding value for #{key} to be an Array" unless ids_set.is_a?(Array)

View File

@ -557,6 +557,27 @@ class EachTest < ActiveRecord::TestCase
assert_equal expected_orders, orders
end
def test_in_batches_when_loaded_iterates_using_custom_column
c = Post.lease_connection
c.add_index(:posts, :title, unique: true)
ActiveRecord::Base.schema_cache.clear!
ordered_posts = Post.order(id: :desc)
ordered_posts.load
posts = []
assert_no_queries do
ordered_posts.in_batches(of: 1, cursor: :id, order: :desc).each_record do |post|
posts << post
end
end
assert_equal ordered_posts.to_a, posts
ensure
c.remove_index(:posts, :title)
end
def test_in_batches_should_return_relations
assert_queries_count(@total + 1) do
Post.in_batches(of: 1) do |relation|
@ -760,6 +781,79 @@ class EachTest < ActiveRecord::TestCase
assert_equal 2, person.reload.author_id # incremented only once
end
def test_in_batches_with_custom_columns_raises_when_start_missing_items
assert_raises(ArgumentError, match: ":start must contain one value per cursor column") do
Post.in_batches(start: 1, cursor: [:author_id, :id]) { }
end
end
def test_in_batches_with_custom_columns_raises_when_finish_missing_items
assert_raises(ArgumentError, match: ":finish must contain one value per cursor column") do
Post.in_batches(finish: 10, cursor: [:author_id, :id]) { }
end
end
def test_in_batches_with_custom_columns_raises_when_non_unique_columns
ActiveRecord::Base.schema_cache.clear!
# Non unique column.
assert_raises(ArgumentError, match: /must include a primary key/) do
Post.in_batches(cursor: :title) { }
end
# Primary key column.
assert_nothing_raised do
Post.in_batches(cursor: :id) { }
end
c = Post.lease_connection
c.add_index(:posts, :title)
ActiveRecord::Base.schema_cache.clear!
# Non unique indexed column.
assert_raises(ArgumentError, match: /must include a primary key/) do
Post.in_batches(cursor: :title) { }
end
c.remove_index(:posts, :title)
if current_adapter?(:PostgreSQLAdapter)
c.add_index(:posts, :title, unique: true, where: "id > 5")
ActiveRecord::Base.schema_cache.clear!
# Column having a unique, but partial, index.
assert_raises(ArgumentError, match: /must include a primary key/) do
Post.in_batches(cursor: :title) { }
end
c.remove_index(:posts, :title)
end
c.add_index(:posts, :title, unique: true)
ActiveRecord::Base.schema_cache.clear!
assert_nothing_raised do
Post.in_batches(cursor: :title) { }
end
ensure
c.remove_index(:posts, :title)
end
def test_in_batches_iterating_using_custom_columns
c = Post.lease_connection
c.add_index(:posts, :title, unique: true)
ActiveRecord::Base.schema_cache.clear!
expected_posts = Post.order(id: :desc).to_a
posts = []
Post.in_batches(of: 1, cursor: :id, order: :desc).each_record do |post|
posts << post
end
assert_equal expected_posts, posts
ensure
c.remove_index(:posts, :title)
end
def test_find_in_batches_should_return_a_sized_enumerator
assert_equal 11, Post.find_in_batches(batch_size: 1).size
assert_equal 6, Post.find_in_batches(batch_size: 2).size

View File

@ -124,7 +124,7 @@ module ActiveRecord
assert_match(/Expected corresponding value for.*to be an Array/, error.message)
error = assert_raise ArgumentError do
Cpk::Book.where([:one] => 1)
Cpk::Book.where([:one, :two] => 1)
end
assert_match(/Expected corresponding value for.*to be an Array/, error.message)