From fd52a754f2c6e234ee46834c63cfde02a368d5c6 Mon Sep 17 00:00:00 2001 From: fatkodima Date: Sat, 13 Jul 2024 17:19:06 +0300 Subject: [PATCH] Support batching using custom columns --- activerecord/CHANGELOG.md | 10 + .../lib/active_record/relation/batches.rb | 210 +++++++++++------- .../relation/batches/batch_enumerator.rb | 7 +- .../relation/predicate_builder.rb | 5 + activerecord/test/cases/batches_test.rb | 94 ++++++++ .../test/cases/relation/where_test.rb | 2 +- 6 files changed, 241 insertions(+), 87 deletions(-) diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index d4dddae62fb..de5c6d917ba 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,3 +1,13 @@ +* Support batching using custom columns. + + ```ruby + Product.in_batches(cursor: [:shop_id, :id]) do |relation| + # do something with relation + end + ``` + + *fatkodima* + * Use SQLite `IMMEDIATE` transactions when possible. Transactions run against the SQLite3 adapter default to IMMEDIATE mode to improve concurrency support and avoid busy exceptions. diff --git a/activerecord/lib/active_record/relation/batches.rb b/activerecord/lib/active_record/relation/batches.rb index 94e30d6bb81..ef00d5a9ac4 100644 --- a/activerecord/lib/active_record/relation/batches.rb +++ b/activerecord/lib/active_record/relation/batches.rb @@ -5,7 +5,7 @@ require "active_record/relation/batches/batch_enumerator" module ActiveRecord # = Active Record \Batches module Batches - ORDER_IGNORE_MESSAGE = "Scoped order is ignored, it's forced to be batch order." + ORDER_IGNORE_MESSAGE = "Scoped order is ignored, use :cursor with :order to configure custom order." DEFAULT_ORDER = :asc # Looping through a collection of records from the database @@ -35,11 +35,13 @@ module ActiveRecord # # ==== Options # * :batch_size - Specifies the size of the batch. Defaults to 1000. - # * :start - Specifies the primary key value to start from, inclusive of the value. - # * :finish - Specifies the primary key value to end at, inclusive of the value. + # * :start - Specifies the cursor column value to start from, inclusive of the value. + # * :finish - Specifies the cursor column value to end at, inclusive of the value. # * :error_on_ignore - Overrides the application config to specify if an error should be raised when # an order is present in the relation. - # * :order - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting + # * :cursor - Specifies the column to use for batching (can be a column name or an array + # of column names). Defaults to primary key. + # * :order - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting # of :asc or :desc). Defaults to +:asc+. # # class Order < ActiveRecord::Base @@ -71,20 +73,25 @@ module ActiveRecord # # NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to # ascending on the primary key ("id ASC"). - # This also means that this method only works when the primary key is + # This also means that this method only works when the cursor column is # orderable (e.g. an integer or string). # + # NOTE: When using custom columns for batching, they should include at least one unique column + # (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions, + # all columns should be static (unchangeable after it was set). + # # NOTE: By its nature, batch processing is subject to race conditions if # other processes are modifying the database. - def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER, &block) + def find_each(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER, &block) if block_given? - find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do |records| + find_in_batches(start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do |records| records.each(&block) end else - enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do + enum_for(:find_each, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do relation = self - apply_limits(relation, start, finish, build_batch_orders(order)).size + cursor = Array(cursor) + apply_limits(relation, cursor, start, finish, build_batch_orders(cursor, order)).size end end end @@ -109,11 +116,13 @@ module ActiveRecord # # ==== Options # * :batch_size - Specifies the size of the batch. Defaults to 1000. - # * :start - Specifies the primary key value to start from, inclusive of the value. - # * :finish - Specifies the primary key value to end at, inclusive of the value. + # * :start - Specifies the cursor column value to start from, inclusive of the value. + # * :finish - Specifies the cursor column value to end at, inclusive of the value. # * :error_on_ignore - Overrides the application config to specify if an error should be raised when # an order is present in the relation. - # * :order - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting + # * :cursor - Specifies the column to use for batching (can be a column name or an array + # of column names). Defaults to primary key. + # * :order - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting # of :asc or :desc). Defaults to +:asc+. # # class Order < ActiveRecord::Base @@ -140,21 +149,26 @@ module ActiveRecord # # NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to # ascending on the primary key ("id ASC"). - # This also means that this method only works when the primary key is + # This also means that this method only works when the cursor column is # orderable (e.g. an integer or string). # + # NOTE: When using custom columns for batching, they should include at least one unique column + # (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions, + # all columns should be static (unchangeable after it was set). + # # NOTE: By its nature, batch processing is subject to race conditions if # other processes are modifying the database. - def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, order: DEFAULT_ORDER) + def find_in_batches(start: nil, finish: nil, batch_size: 1000, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER) relation = self unless block_given? - return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, order: order) do - total = apply_limits(relation, start, finish, build_batch_orders(order)).size + return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do + cursor = Array(cursor) + total = apply_limits(relation, cursor, start, finish, build_batch_orders(cursor, order)).size (total - 1).div(batch_size) + 1 end end - in_batches(of: batch_size, start: start, finish: finish, load: true, error_on_ignore: error_on_ignore, order: order) do |batch| + in_batches(of: batch_size, start: start, finish: finish, load: true, error_on_ignore: error_on_ignore, cursor: cursor, order: order) do |batch| yield batch.to_a end end @@ -183,11 +197,13 @@ module ActiveRecord # ==== Options # * :of - Specifies the size of the batch. Defaults to 1000. # * :load - Specifies if the relation should be loaded. Defaults to false. - # * :start - Specifies the primary key value to start from, inclusive of the value. - # * :finish - Specifies the primary key value to end at, inclusive of the value. + # * :start - Specifies the cursor column value to start from, inclusive of the value. + # * :finish - Specifies the cursor column value to end at, inclusive of the value. # * :error_on_ignore - Overrides the application config to specify if an error should be raised when # an order is present in the relation. - # * :order - Specifies the primary key order (can be +:asc+ or +:desc+ or an array consisting + # * :cursor - Specifies the column to use for batching (can be a column name or an array + # of column names). Defaults to primary key. + # * :order - Specifies the cursor column order (can be +:asc+ or +:desc+ or an array consisting # of :asc or :desc). Defaults to +:asc+. # # class Order < ActiveRecord::Base @@ -231,18 +247,21 @@ module ActiveRecord # # NOTE: Order can be ascending (:asc) or descending (:desc). It is automatically set to # ascending on the primary key ("id ASC"). - # This also means that this method only works when the primary key is + # This also means that this method only works when the cursor column is # orderable (e.g. an integer or string). # + # NOTE: When using custom columns for batching, they should include at least one unique column + # (e.g. primary key) as a tiebreaker. Also, to reduce the likelihood of race conditions, + # all columns should be static (unchangeable after it was set). + # # NOTE: By its nature, batch processing is subject to race conditions if # other processes are modifying the database. - def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, order: DEFAULT_ORDER, use_ranges: nil, &block) - unless Array(order).all? { |ord| [:asc, :desc].include?(ord) } - raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}" - end + def in_batches(of: 1000, start: nil, finish: nil, load: false, error_on_ignore: nil, cursor: primary_key, order: DEFAULT_ORDER, use_ranges: nil, &block) + cursor = Array(cursor).map(&:to_s) + ensure_valid_options_for_batching!(cursor, start, finish, order) unless block - return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self, order: order, use_ranges: use_ranges) + return BatchEnumerator.new(of: of, start: start, finish: finish, relation: self, cursor: cursor, order: order, use_ranges: use_ranges) end if arel.orders.present? @@ -261,6 +280,7 @@ module ActiveRecord relation: self, start: start, finish: finish, + cursor: cursor, order: order, batch_limit: batch_limit, &block @@ -271,6 +291,7 @@ module ActiveRecord start: start, finish: finish, load: load, + cursor: cursor, order: order, use_ranges: use_ranges, remaining: remaining, @@ -281,28 +302,51 @@ module ActiveRecord end private - def apply_limits(relation, start, finish, batch_orders) - relation = apply_start_limit(relation, start, batch_orders) if start - relation = apply_finish_limit(relation, finish, batch_orders) if finish + def ensure_valid_options_for_batching!(cursor, start, finish, order) + if start && Array(start).size != cursor.size + raise ArgumentError, ":start must contain one value per cursor column" + end + + if finish && Array(finish).size != cursor.size + raise ArgumentError, ":finish must contain one value per cursor column" + end + + if (Array(primary_key) - cursor).any? + indexes = model.schema_cache.indexes(table_name) + unique_index = indexes.find { |index| index.unique && index.where.nil? && (Array(index.columns) - cursor).empty? } + + unless unique_index + raise ArgumentError, ":cursor must include a primary key or other unique column(s)" + end + end + + if (Array(order) - [:asc, :desc]).any? + raise ArgumentError, ":order must be :asc or :desc or an array consisting of :asc or :desc, got #{order.inspect}" + end + end + + def apply_limits(relation, cursor, start, finish, batch_orders) + relation = apply_start_limit(relation, cursor, start, batch_orders) if start + relation = apply_finish_limit(relation, cursor, finish, batch_orders) if finish relation end - def apply_start_limit(relation, start, batch_orders) + def apply_start_limit(relation, cursor, start, batch_orders) operators = batch_orders.map do |_column, order| order == :desc ? :lteq : :gteq end - batch_condition(relation, primary_key, start, operators) + batch_condition(relation, cursor, start, operators) end - def apply_finish_limit(relation, finish, batch_orders) + def apply_finish_limit(relation, cursor, finish, batch_orders) operators = batch_orders.map do |_column, order| order == :desc ? :gteq : :lteq end - batch_condition(relation, primary_key, finish, operators) + batch_condition(relation, cursor, finish, operators) end - def batch_condition(relation, columns, values, operators) - cursor_positions = Array(columns).zip(Array(values), operators) + def batch_condition(relation, cursor, values, operators) + cursor_positions = cursor.zip(Array(values), operators) first_clause_column, first_clause_value, operator = cursor_positions.pop where_clause = predicate_builder[first_clause_column, first_clause_value, operator] @@ -316,9 +360,9 @@ module ActiveRecord relation.where(where_clause) end - def build_batch_orders(order) - get_the_order_of_primary_key(order).map do |column, ord| - [column, ord || DEFAULT_ORDER] + def build_batch_orders(cursor, order) + cursor.zip(Array(order)).map do |column, order_| + [column, order_ || DEFAULT_ORDER] end end @@ -332,23 +376,23 @@ module ActiveRecord end end - def get_the_order_of_primary_key(order) - Array(primary_key).zip(Array(order)) - end - - def batch_on_loaded_relation(relation:, start:, finish:, order:, batch_limit:) + def batch_on_loaded_relation(relation:, start:, finish:, cursor:, order:, batch_limit:) records = relation.to_a - order = build_batch_orders(order).map(&:second) + order = build_batch_orders(cursor, order).map(&:second) if start || finish records = records.filter do |record| - (start.nil? || compare_values_for_order(record.id, start, order) >= 0) && - (finish.nil? || compare_values_for_order(record.id, finish, order) <= 0) + values = record_cursor_values(record, cursor) + + (start.nil? || compare_values_for_order(values, Array(start), order) >= 0) && + (finish.nil? || compare_values_for_order(values, Array(finish), order) <= 0) end end records.sort! do |record1, record2| - compare_values_for_order(record1.id, record2.id, order) + values1 = record_cursor_values(record1, cursor) + values2 = record_cursor_values(record2, cursor) + compare_values_for_order(values1, values2, order) end records.each_slice(batch_limit) do |subrecords| @@ -361,32 +405,28 @@ module ActiveRecord nil end - # This is a custom implementation of `<=>` operator, - # which also takes into account how the collection will be ordered. - def compare_values_for_order(value1, value2, order) - # Multiple column values. - if value1.is_a?(Array) - value1.each_with_index do |element1, index| - element2 = value2[index] - direction = order[index] - comparison = element1 <=> element2 - comparison = -comparison if direction == :desc - return comparison if comparison != 0 - end - - 0 - # Single column values. - elsif order.first == :asc - value1 <=> value2 - else - value2 <=> value1 - end + def record_cursor_values(record, cursor) + record.attributes.slice(*cursor).values end - def batch_on_unloaded_relation(relation:, start:, finish:, load:, order:, use_ranges:, remaining:, batch_limit:) - batch_orders = build_batch_orders(order) + # This is a custom implementation of `<=>` operator, + # which also takes into account how the collection will be ordered. + def compare_values_for_order(values1, values2, order) + values1.each_with_index do |element1, index| + element2 = values2[index] + direction = order[index] + comparison = element1 <=> element2 + comparison = -comparison if direction == :desc + return comparison if comparison != 0 + end + + 0 + end + + def batch_on_unloaded_relation(relation:, start:, finish:, load:, cursor:, order:, use_ranges:, remaining:, batch_limit:) + batch_orders = build_batch_orders(cursor, order) relation = relation.reorder(batch_orders.to_h).limit(batch_limit) - relation = apply_limits(relation, start, finish, batch_orders) + relation = apply_limits(relation, cursor, start, finish, batch_orders) relation.skip_query_cache! # Retaining the results in the query cache would undermine the point of batching batch_relation = relation empty_scope = to_sql == model.unscoped.all.to_sql @@ -394,33 +434,36 @@ module ActiveRecord loop do if load records = batch_relation.records - ids = records.map(&:id) - yielded_relation = where(primary_key => ids) + values = records.pluck(*cursor) + yielded_relation = where(cursor => values) yielded_relation.load_records(records) elsif (empty_scope && use_ranges != false) || use_ranges - ids = batch_relation.ids - finish = ids.last + values = batch_relation.pluck(*cursor) + + finish = values.last if finish - yielded_relation = apply_finish_limit(batch_relation, finish, batch_orders) + yielded_relation = apply_finish_limit(batch_relation, cursor, finish, batch_orders) yielded_relation = yielded_relation.except(:limit, :order) yielded_relation.skip_query_cache!(false) end else - ids = batch_relation.ids - yielded_relation = where(primary_key => ids) + values = batch_relation.pluck(*cursor) + yielded_relation = where(cursor => values) end - break if ids.empty? + break if values.empty? - primary_key_offset = ids.last - raise ArgumentError.new("Primary key not included in the custom select clause") unless primary_key_offset + if values.flatten.any?(nil) + raise ArgumentError, "Not all of the batch cursor columns were included in the custom select clause "\ + "or some columns contain nil." + end yield yielded_relation - break if ids.length < batch_limit + break if values.length < batch_limit if limit_value - remaining -= ids.length + remaining -= values.length if remaining == 0 # Saves a useless iteration when the limit is a multiple of the @@ -438,7 +481,8 @@ module ActiveRecord end operators << (last_order == :desc ? :lt : :gt) - batch_relation = batch_condition(relation, primary_key, primary_key_offset, operators) + cursor_value = values.last + batch_relation = batch_condition(relation, cursor, cursor_value, operators) end nil diff --git a/activerecord/lib/active_record/relation/batches/batch_enumerator.rb b/activerecord/lib/active_record/relation/batches/batch_enumerator.rb index cc2a86bfbc0..78cd753c7bb 100644 --- a/activerecord/lib/active_record/relation/batches/batch_enumerator.rb +++ b/activerecord/lib/active_record/relation/batches/batch_enumerator.rb @@ -5,11 +5,12 @@ module ActiveRecord class BatchEnumerator include Enumerable - def initialize(of: 1000, start: nil, finish: nil, relation:, order: :asc, use_ranges: nil) # :nodoc: + def initialize(of: 1000, start: nil, finish: nil, relation:, cursor:, order: :asc, use_ranges: nil) # :nodoc: @of = of @relation = relation @start = start @finish = finish + @cursor = cursor @order = order @use_ranges = use_ranges end @@ -52,7 +53,7 @@ module ActiveRecord def each_record(&block) return to_enum(:each_record) unless block_given? - @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: true, order: @order).each do |relation| + @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: true, cursor: @cursor, order: @order).each do |relation| relation.records.each(&block) end end @@ -105,7 +106,7 @@ module ActiveRecord # relation.update_all(awesome: true) # end def each(&block) - enum = @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: false, order: @order, use_ranges: @use_ranges) + enum = @relation.to_enum(:in_batches, of: @of, start: @start, finish: @finish, load: false, cursor: @cursor, order: @order, use_ranges: @use_ranges) return enum.each(&block) if block_given? enum end diff --git a/activerecord/lib/active_record/relation/predicate_builder.rb b/activerecord/lib/active_record/relation/predicate_builder.rb index b682221d5ba..60c2de24e0a 100644 --- a/activerecord/lib/active_record/relation/predicate_builder.rb +++ b/activerecord/lib/active_record/relation/predicate_builder.rb @@ -77,6 +77,11 @@ module ActiveRecord return ["1=0"] if attributes.empty? attributes.flat_map do |key, value| + if key.is_a?(Array) && key.size == 1 + key = key.first + value = value.flatten + end + if key.is_a?(Array) queries = Array(value).map do |ids_set| raise ArgumentError, "Expected corresponding value for #{key} to be an Array" unless ids_set.is_a?(Array) diff --git a/activerecord/test/cases/batches_test.rb b/activerecord/test/cases/batches_test.rb index 2e8ccb8617d..3d2660fb8ba 100644 --- a/activerecord/test/cases/batches_test.rb +++ b/activerecord/test/cases/batches_test.rb @@ -557,6 +557,27 @@ class EachTest < ActiveRecord::TestCase assert_equal expected_orders, orders end + def test_in_batches_when_loaded_iterates_using_custom_column + c = Post.lease_connection + c.add_index(:posts, :title, unique: true) + ActiveRecord::Base.schema_cache.clear! + + ordered_posts = Post.order(id: :desc) + ordered_posts.load + + posts = [] + + assert_no_queries do + ordered_posts.in_batches(of: 1, cursor: :id, order: :desc).each_record do |post| + posts << post + end + end + + assert_equal ordered_posts.to_a, posts + ensure + c.remove_index(:posts, :title) + end + def test_in_batches_should_return_relations assert_queries_count(@total + 1) do Post.in_batches(of: 1) do |relation| @@ -760,6 +781,79 @@ class EachTest < ActiveRecord::TestCase assert_equal 2, person.reload.author_id # incremented only once end + def test_in_batches_with_custom_columns_raises_when_start_missing_items + assert_raises(ArgumentError, match: ":start must contain one value per cursor column") do + Post.in_batches(start: 1, cursor: [:author_id, :id]) { } + end + end + + def test_in_batches_with_custom_columns_raises_when_finish_missing_items + assert_raises(ArgumentError, match: ":finish must contain one value per cursor column") do + Post.in_batches(finish: 10, cursor: [:author_id, :id]) { } + end + end + + def test_in_batches_with_custom_columns_raises_when_non_unique_columns + ActiveRecord::Base.schema_cache.clear! + + # Non unique column. + assert_raises(ArgumentError, match: /must include a primary key/) do + Post.in_batches(cursor: :title) { } + end + + # Primary key column. + assert_nothing_raised do + Post.in_batches(cursor: :id) { } + end + + c = Post.lease_connection + c.add_index(:posts, :title) + ActiveRecord::Base.schema_cache.clear! + + # Non unique indexed column. + assert_raises(ArgumentError, match: /must include a primary key/) do + Post.in_batches(cursor: :title) { } + end + + c.remove_index(:posts, :title) + + if current_adapter?(:PostgreSQLAdapter) + c.add_index(:posts, :title, unique: true, where: "id > 5") + ActiveRecord::Base.schema_cache.clear! + + # Column having a unique, but partial, index. + assert_raises(ArgumentError, match: /must include a primary key/) do + Post.in_batches(cursor: :title) { } + end + + c.remove_index(:posts, :title) + end + + c.add_index(:posts, :title, unique: true) + ActiveRecord::Base.schema_cache.clear! + assert_nothing_raised do + Post.in_batches(cursor: :title) { } + end + ensure + c.remove_index(:posts, :title) + end + + def test_in_batches_iterating_using_custom_columns + c = Post.lease_connection + c.add_index(:posts, :title, unique: true) + ActiveRecord::Base.schema_cache.clear! + + expected_posts = Post.order(id: :desc).to_a + posts = [] + Post.in_batches(of: 1, cursor: :id, order: :desc).each_record do |post| + posts << post + end + + assert_equal expected_posts, posts + ensure + c.remove_index(:posts, :title) + end + def test_find_in_batches_should_return_a_sized_enumerator assert_equal 11, Post.find_in_batches(batch_size: 1).size assert_equal 6, Post.find_in_batches(batch_size: 2).size diff --git a/activerecord/test/cases/relation/where_test.rb b/activerecord/test/cases/relation/where_test.rb index b15eea858b3..8e6dc2103ef 100644 --- a/activerecord/test/cases/relation/where_test.rb +++ b/activerecord/test/cases/relation/where_test.rb @@ -124,7 +124,7 @@ module ActiveRecord assert_match(/Expected corresponding value for.*to be an Array/, error.message) error = assert_raise ArgumentError do - Cpk::Book.where([:one] => 1) + Cpk::Book.where([:one, :two] => 1) end assert_match(/Expected corresponding value for.*to be an Array/, error.message)