diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md
index d4dddae62fb..de5c6d917ba 100644
--- a/activerecord/CHANGELOG.md
+++ b/activerecord/CHANGELOG.md
@@ -1,3 +1,13 @@
+* Support batching using custom columns.
+
+ ```ruby
+ Product.in_batches(cursor: [:shop_id, :id]) do |relation|
+ # do something with relation
+ end
+ ```
+
+ *fatkodima*
+
* Use SQLite `IMMEDIATE` transactions when possible.
Transactions run against the SQLite3 adapter default to IMMEDIATE mode to improve concurrency support and avoid busy exceptions.
diff --git a/activerecord/lib/active_record/relation/batches.rb b/activerecord/lib/active_record/relation/batches.rb
index 94e30d6bb81..ef00d5a9ac4 100644
--- a/activerecord/lib/active_record/relation/batches.rb
+++ b/activerecord/lib/active_record/relation/batches.rb
@@ -5,7 +5,7 @@ require "active_record/relation/batches/batch_enumerator"
module ActiveRecord
# = Active Record \Batches
module Batches
- ORDER_IGNORE_MESSAGE = "Scoped order is ignored, it's forced to be batch order."
+ ORDER_IGNORE_MESSAGE = "Scoped order is ignored, use :cursor with :order to configure custom order."
DEFAULT_ORDER = :asc
# Looping through a collection of records from the database
@@ -35,11 +35,13 @@ module ActiveRecord
#
# ==== Options
# * :batch_size - Specifies the size of the batch. Defaults to 1000.
- # * :start - Specifies the primary key value to start from, inclusive of the value.
- # * :finish - Specifies the primary key value to end at, inclusive of the value.
+ # * :start - Specifies the cursor column value to start from, inclusive of the value.
+ # * :finish - Specifies the cursor column value to end at, inclusive of the value.
# * :error_on_ignore - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
- # * :order - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
+ # * :cursor - Specifies the column to use for batching (can be a column name or an array
+ # of column names). Defaults to primary key.
+ # * :order - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
@@ -71,20 +73,25 @@ module ActiveRecord
#
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
# ascending on the primary key ("id ASC").
- # This also means that this method only works when the primary key is
+ # This also means that this method only works when the cursor column is
# orderable (e.g. an integer or string).
#
+ # NOTE: When using custom columns for batching, they should include at least one unique column
+ # (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
+ # all columns should be static (unchangeable after it was set).
+ #
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
- def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER, &block)
+ def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER, &block)
if block_given?
- find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do |records|
+ find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do |records|
records.each(&block)
end
else
- enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do
+ enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do
relation = self
- apply_limits(relation, start, finish, build_batch_orders(order)).size
+ cursor = Array(cursor)
+ apply_limits(relation, cursor, start, finish, build_batch_orders(cursor, order)).size
end
end
end
@@ -109,11 +116,13 @@ module ActiveRecord
#
# ==== Options
# * :batch_size - Specifies the size of the batch. Defaults to 1000.
- # * :start - Specifies the primary key value to start from, inclusive of the value.
- # * :finish - Specifies the primary key value to end at, inclusive of the value.
+ # * :start - Specifies the cursor column value to start from, inclusive of the value.
+ # * :finish - Specifies the cursor column value to end at, inclusive of the value.
# * :error_on_ignore - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
- # * :order - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
+ # * :cursor - Specifies the column to use for batching (can be a column name or an array
+ # of column names). Defaults to primary key.
+ # * :order - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
@@ -140,21 +149,26 @@ module ActiveRecord
#
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
# ascending on the primary key ("id ASC").
- # This also means that this method only works when the primary key is
+ # This also means that this method only works when the cursor column is
# orderable (e.g. an integer or string).
#
+ # NOTE: When using custom columns for batching, they should include at least one unique column
+ # (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
+ # all columns should be static (unchangeable after it was set).
+ #
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
- def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER)
+ def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER)
relation = self
unless block_given?
- return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do
- total = apply_limits(relation, start, finish, build_batch_orders(order)).size
+ return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do
+ cursor = Array(cursor)
+ total = apply_limits(relation, cursor, start, finish, build_batch_orders(cursor, order)).size
(total - 1).div(batch_size) + 1
end
end
- in_batches(of: batch_size, start: start, finish: finish, load: true, error_on_ignore: error_on_ignore, order: order) do |batch|
+ in_batches(of: batch_size, start: start, finish: finish, load: true, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do |batch|
yield batch.to_a
end
end
@@ -183,11 +197,13 @@ module ActiveRecord
# ==== Options
# * :of - Specifies the size of the batch. Defaults to 1000.
# * :load - Specifies if the relation should be loaded. Defaults to false.
- # * :start - Specifies the primary key value to start from, inclusive of the value.
- # * :finish - Specifies the primary key value to end at, inclusive of the value.
+ # * :start - Specifies the cursor column value to start from, inclusive of the value.
+ # * :finish - Specifies the cursor column value to end at, inclusive of the value.
# * :error_on_ignore - Overrides the application config to specify if an error should be raised when
# an order is present in the relation.
- # * :order - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting
+ # * :cursor - Specifies the column to use for batching (can be a column name or an array
+ # of column names). Defaults to primary key.
+ # * :order - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting
# of :asc or :desc). Defaults to +:asc+.
#
# class Order < ActiveRecord::Base
@@ -231,18 +247,21 @@ module ActiveRecord
#
# NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to
# ascending on the primary key ("id ASC").
- # This also means that this method only works when the primary key is
+ # This also means that this method only works when the cursor column is
# orderable (e.g. an integer or string).
#
+ # NOTE: When using custom columns for batching, they should include at least one unique column
+ # (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions,
+ # all columns should be static (unchangeable after it was set).
+ #
# NOTE: By its nature, batch processing is subject to race conditions if
# other processes are modifying the database.
- def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, order: DEFAULT_ORDER, use_ranges: nil, &block)
- unless Array(order).all? { |ord| [:asc, :desc].include?(ord) }
- raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}"
- end
+ def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER, use_ranges: nil, &block)
+ cursor = Array(cursor).map(&:to_s)
+ ensure_valid_options_for_batching!(cursor, start, finish, order)
unless block
- return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self, order: order, use_ranges: use_ranges)
+ return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self, cursor: cursor, order: order, use_ranges: use_ranges)
end
if arel.orders.present?
@@ -261,6 +280,7 @@ module ActiveRecord
relation: self,
start: start,
finish: finish,
+ cursor: cursor,
order: order,
batch_limit: batch_limit,
&block
@@ -271,6 +291,7 @@ module ActiveRecord
start: start,
finish: finish,
load: load,
+ cursor: cursor,
order: order,
use_ranges: use_ranges,
remaining: remaining,
@@ -281,28 +302,51 @@ module ActiveRecord
end
private
- def apply_limits(relation, start, finish, batch_orders)
- relation = apply_start_limit(relation, start, batch_orders) if start
- relation = apply_finish_limit(relation, finish, batch_orders) if finish
+ def ensure_valid_options_for_batching!(cursor, start, finish, order)
+ if start && Array(start).size != cursor.size
+ raise ArgumentError, ":start must contain one value per cursor column"
+ end
+
+ if finish && Array(finish).size != cursor.size
+ raise ArgumentError, ":finish must contain one value per cursor column"
+ end
+
+ if (Array(primary_key) - cursor).any?
+ indexes = model.schema_cache.indexes(table_name)
+ unique_index = indexes.find { |index| index.unique && index.where.nil? && (Array(index.columns) - cursor).empty? }
+
+ unless unique_index
+ raise ArgumentError, ":cursor must include a primary key or other unique column(s)"
+ end
+ end
+
+ if (Array(order) - [:asc, :desc]).any?
+ raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}"
+ end
+ end
+
+ def apply_limits(relation, cursor, start, finish, batch_orders)
+ relation = apply_start_limit(relation, cursor, start, batch_orders) if start
+ relation = apply_finish_limit(relation, cursor, finish, batch_orders) if finish
relation
end
- def apply_start_limit(relation, start, batch_orders)
+ def apply_start_limit(relation, cursor, start, batch_orders)
operators = batch_orders.map do |_column, order|
order == :desc ? :lteq : :gteq
end
- batch_condition(relation, primary_key, start, operators)
+ batch_condition(relation, cursor, start, operators)
end
- def apply_finish_limit(relation, finish, batch_orders)
+ def apply_finish_limit(relation, cursor, finish, batch_orders)
operators = batch_orders.map do |_column, order|
order == :desc ? :gteq : :lteq
end
- batch_condition(relation, primary_key, finish, operators)
+ batch_condition(relation, cursor, finish, operators)
end
- def batch_condition(relation, columns, values, operators)
- cursor_positions = Array(columns).zip(Array(values), operators)
+ def batch_condition(relation, cursor, values, operators)
+ cursor_positions = cursor.zip(Array(values), operators)
first_clause_column, first_clause_value, operator = cursor_positions.pop
where_clause = predicate_builder[first_clause_column, first_clause_value, operator]
@@ -316,9 +360,9 @@ module ActiveRecord
relation.where(where_clause)
end
- def build_batch_orders(order)
- get_the_order_of_primary_key(order).map do |column, ord|
- [column, ord || DEFAULT_ORDER]
+ def build_batch_orders(cursor, order)
+ cursor.zip(Array(order)).map do |column, order_|
+ [column, order_ || DEFAULT_ORDER]
end
end
@@ -332,23 +376,23 @@ module ActiveRecord
end
end
- def get_the_order_of_primary_key(order)
- Array(primary_key).zip(Array(order))
- end
-
- def batch_on_loaded_relation(relation:, start:, finish:, order:, batch_limit:)
+ def batch_on_loaded_relation(relation:, start:, finish:, cursor:, order:, batch_limit:)
records = relation.to_a
- order = build_batch_orders(order).map(&:second)
+ order = build_batch_orders(cursor, order).map(&:second)
if start || finish
records = records.filter do |record|
- (start.nil? || compare_values_for_order(record.id, start, order) >= 0) &&
- (finish.nil? || compare_values_for_order(record.id, finish, order) <= 0)
+ values = record_cursor_values(record, cursor)
+
+ (start.nil? || compare_values_for_order(values, Array(start), order) >= 0) &&
+ (finish.nil? || compare_values_for_order(values, Array(finish), order) <= 0)
end
end
records.sort! do |record1, record2|
- compare_values_for_order(record1.id, record2.id, order)
+ values1 = record_cursor_values(record1, cursor)
+ values2 = record_cursor_values(record2, cursor)
+ compare_values_for_order(values1, values2, order)
end
records.each_slice(batch_limit) do |subrecords|
@@ -361,32 +405,28 @@ module ActiveRecord
nil
end
- # This is a custom implementation of `<=>` operator,
- # which also takes into account how the collection will be ordered.
- def compare_values_for_order(value1, value2, order)
- # Multiple column values.
- if value1.is_a?(Array)
- value1.each_with_index do |element1, index|
- element2 = value2[index]
- direction = order[index]
- comparison = element1 <=> element2
- comparison = -comparison if direction == :desc
- return comparison if comparison != 0
- end
-
- 0
- # Single column values.
- elsif order.first == :asc
- value1 <=> value2
- else
- value2 <=> value1
- end
+ def record_cursor_values(record, cursor)
+ record.attributes.slice(*cursor).values
end
- def batch_on_unloaded_relation(relation:, start:, finish:, load:, order:, use_ranges:, remaining:, batch_limit:)
- batch_orders = build_batch_orders(order)
+ # This is a custom implementation of `<=>` operator,
+ # which also takes into account how the collection will be ordered.
+ def compare_values_for_order(values1, values2, order)
+ values1.each_with_index do |element1, index|
+ element2 = values2[index]
+ direction = order[index]
+ comparison = element1 <=> element2
+ comparison = -comparison if direction == :desc
+ return comparison if comparison != 0
+ end
+
+ 0
+ end
+
+ def batch_on_unloaded_relation(relation:, start:, finish:, load:, cursor:, order:, use_ranges:, remaining:, batch_limit:)
+ batch_orders = build_batch_orders(cursor, order)
relation = relation.reorder(batch_orders.to_h).limit(batch_limit)
- relation = apply_limits(relation, start, finish, batch_orders)
+ relation = apply_limits(relation, cursor, start, finish, batch_orders)
relation.skip_query_cache! # Retaining the results in the query cache would undermine the point of batching
batch_relation = relation
empty_scope = to_sql == model.unscoped.all.to_sql
@@ -394,33 +434,36 @@ module ActiveRecord
loop do
if load
records = batch_relation.records
- ids = records.map(&:id)
- yielded_relation = where(primary_key => ids)
+ values = records.pluck(*cursor)
+ yielded_relation = where(cursor => values)
yielded_relation.load_records(records)
elsif (empty_scope && use_ranges != false) || use_ranges
- ids = batch_relation.ids
- finish = ids.last
+ values = batch_relation.pluck(*cursor)
+
+ finish = values.last
if finish
- yielded_relation = apply_finish_limit(batch_relation, finish, batch_orders)
+ yielded_relation = apply_finish_limit(batch_relation, cursor, finish, batch_orders)
yielded_relation = yielded_relation.except(:limit, :order)
yielded_relation.skip_query_cache!(false)
end
else
- ids = batch_relation.ids
- yielded_relation = where(primary_key => ids)
+ values = batch_relation.pluck(*cursor)
+ yielded_relation = where(cursor => values)
end
- break if ids.empty?
+ break if values.empty?
- primary_key_offset = ids.last
- raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset
+ if values.flatten.any?(nil)
+ raise ArgumentError, "Not all of the batch cursor columns were included in the custom select clause "\
+ "or some columns contain nil."
+ end
yield yielded_relation
- break if ids.length < batch_limit
+ break if values.length < batch_limit
if limit_value
- remaining -= ids.length
+ remaining -= values.length
if remaining == 0
# Saves a useless iteration when the limit is a multiple of the
@@ -438,7 +481,8 @@ module ActiveRecord
end
operators << (last_order == :desc ? :lt : :gt)
- batch_relation = batch_condition(relation, primary_key, primary_key_offset, operators)
+ cursor_value = values.last
+ batch_relation = batch_condition(relation, cursor, cursor_value, operators)
end
nil
diff --git a/activerecord/lib/active_record/relation/batches/batch_enumerator.rb b/activerecord/lib/active_record/relation/batches/batch_enumerator.rb
index cc2a86bfbc0..78cd753c7bb 100644
--- a/activerecord/lib/active_record/relation/batches/batch_enumerator.rb
+++ b/activerecord/lib/active_record/relation/batches/batch_enumerator.rb
@@ -5,11 +5,12 @@ module ActiveRecord
class BatchEnumerator
include Enumerable
- def initialize(of: 1000, start: nil, finish: nil, relation:, order: :asc, use_ranges: nil) # :nodoc:
+ def initialize(of: 1000, start: nil, finish: nil, relation:, cursor:, order: :asc, use_ranges: nil) # :nodoc:
@of = of
@relation = relation
@start = start
@finish = finish
+ @cursor = cursor
@order = order
@use_ranges = use_ranges
end
@@ -52,7 +53,7 @@ module ActiveRecord
def each_record(&block)
return to_enum(:each_record) unless block_given?
- @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: true, order: @order).each do |relation|
+ @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: true, cursor: @cursor, order: @order).each do |relation|
relation.records.each(&block)
end
end
@@ -105,7 +106,7 @@ module ActiveRecord
# relation.update_all(awesome: true)
# end
def each(&block)
- enum = @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: false, order: @order, use_ranges: @use_ranges)
+ enum = @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: false, cursor: @cursor, order: @order, use_ranges: @use_ranges)
return enum.each(&block) if block_given?
enum
end
diff --git a/activerecord/lib/active_record/relation/predicate_builder.rb b/activerecord/lib/active_record/relation/predicate_builder.rb
index b682221d5ba..60c2de24e0a 100644
--- a/activerecord/lib/active_record/relation/predicate_builder.rb
+++ b/activerecord/lib/active_record/relation/predicate_builder.rb
@@ -77,6 +77,11 @@ module ActiveRecord
return ["1=0"] if attributes.empty?
attributes.flat_map do |key, value|
+ if key.is_a?(Array) && key.size == 1
+ key = key.first
+ value = value.flatten
+ end
+
if key.is_a?(Array)
queries = Array(value).map do |ids_set|
raise ArgumentError, "Expected corresponding value for #{key} to be an Array" unless ids_set.is_a?(Array)
diff --git a/activerecord/test/cases/batches_test.rb b/activerecord/test/cases/batches_test.rb
index 2e8ccb8617d..3d2660fb8ba 100644
--- a/activerecord/test/cases/batches_test.rb
+++ b/activerecord/test/cases/batches_test.rb
@@ -557,6 +557,27 @@ class EachTest < ActiveRecord::TestCase
assert_equal expected_orders, orders
end
+ def test_in_batches_when_loaded_iterates_using_custom_column
+ c = Post.lease_connection
+ c.add_index(:posts, :title, unique: true)
+ ActiveRecord::Base.schema_cache.clear!
+
+ ordered_posts = Post.order(id: :desc)
+ ordered_posts.load
+
+ posts = []
+
+ assert_no_queries do
+ ordered_posts.in_batches(of: 1, cursor: :id, order: :desc).each_record do |post|
+ posts << post
+ end
+ end
+
+ assert_equal ordered_posts.to_a, posts
+ ensure
+ c.remove_index(:posts, :title)
+ end
+
def test_in_batches_should_return_relations
assert_queries_count(@total + 1) do
Post.in_batches(of: 1) do |relation|
@@ -760,6 +781,79 @@ class EachTest < ActiveRecord::TestCase
assert_equal 2, person.reload.author_id # incremented only once
end
+ def test_in_batches_with_custom_columns_raises_when_start_missing_items
+ assert_raises(ArgumentError, match: ":start must contain one value per cursor column") do
+ Post.in_batches(start: 1, cursor: [:author_id, :id]) { }
+ end
+ end
+
+ def test_in_batches_with_custom_columns_raises_when_finish_missing_items
+ assert_raises(ArgumentError, match: ":finish must contain one value per cursor column") do
+ Post.in_batches(finish: 10, cursor: [:author_id, :id]) { }
+ end
+ end
+
+ def test_in_batches_with_custom_columns_raises_when_non_unique_columns
+ ActiveRecord::Base.schema_cache.clear!
+
+ # Non unique column.
+ assert_raises(ArgumentError, match: /must include a primary key/) do
+ Post.in_batches(cursor: :title) { }
+ end
+
+ # Primary key column.
+ assert_nothing_raised do
+ Post.in_batches(cursor: :id) { }
+ end
+
+ c = Post.lease_connection
+ c.add_index(:posts, :title)
+ ActiveRecord::Base.schema_cache.clear!
+
+ # Non unique indexed column.
+ assert_raises(ArgumentError, match: /must include a primary key/) do
+ Post.in_batches(cursor: :title) { }
+ end
+
+ c.remove_index(:posts, :title)
+
+ if current_adapter?(:PostgreSQLAdapter)
+ c.add_index(:posts, :title, unique: true, where: "id > 5")
+ ActiveRecord::Base.schema_cache.clear!
+
+ # Column having a unique, but partial, index.
+ assert_raises(ArgumentError, match: /must include a primary key/) do
+ Post.in_batches(cursor: :title) { }
+ end
+
+ c.remove_index(:posts, :title)
+ end
+
+ c.add_index(:posts, :title, unique: true)
+ ActiveRecord::Base.schema_cache.clear!
+ assert_nothing_raised do
+ Post.in_batches(cursor: :title) { }
+ end
+ ensure
+ c.remove_index(:posts, :title)
+ end
+
+ def test_in_batches_iterating_using_custom_columns
+ c = Post.lease_connection
+ c.add_index(:posts, :title, unique: true)
+ ActiveRecord::Base.schema_cache.clear!
+
+ expected_posts = Post.order(id: :desc).to_a
+ posts = []
+ Post.in_batches(of: 1, cursor: :id, order: :desc).each_record do |post|
+ posts << post
+ end
+
+ assert_equal expected_posts, posts
+ ensure
+ c.remove_index(:posts, :title)
+ end
+
def test_find_in_batches_should_return_a_sized_enumerator
assert_equal 11, Post.find_in_batches(batch_size: 1).size
assert_equal 6, Post.find_in_batches(batch_size: 2).size
diff --git a/activerecord/test/cases/relation/where_test.rb b/activerecord/test/cases/relation/where_test.rb
index b15eea858b3..8e6dc2103ef 100644
--- a/activerecord/test/cases/relation/where_test.rb
+++ b/activerecord/test/cases/relation/where_test.rb
@@ -124,7 +124,7 @@ module ActiveRecord
assert_match(/Expected corresponding value for.*to be an Array/, error.message)
error = assert_raise ArgumentError do
- Cpk::Book.where([:one] => 1)
+ Cpk::Book.where([:one, :two] => 1)
end
assert_match(/Expected corresponding value for.*to be an Array/, error.message)