re-implement in_batches and friends

* do everything as in_batches, returning a relation. properly
   super `load` param in each backend
 * plumb strategy through all entry points so it can be explicit
 * special case in_batches.delete_all with no explicit strategy to
   do a loop on a limited delete_all (avoids a dumb ORDER BY, or
   having to transfer ids back and forth)
 * since in_batches can easily be used with pluck now that a relation
   is returned, just make find_in_batches_with_temp_table a shim that
   does it the "nice" way

Change-Id: I716f188cdf676a725588f94a1036981ae798b09c
Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/266882
Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com>
Reviewed-by: Jacob Burroughs <jburroughs@instructure.com>
QA-Review: Cody Cutrer <cody@instructure.com>
Product-Review: Cody Cutrer <cody@instructure.com>
This commit is contained in:
Cody Cutrer 2021-06-10 16:32:32 -06:00
parent 7d4afba4b0
commit 230033611d
16 changed files with 575 additions and 406 deletions

View File

@ -108,8 +108,8 @@ class Attachments::GarbageCollector
# Once you're confident and don't want to revert, clean up the DB rows
def delete_rows
raise "Cannot delete rows in dry_run mode" if dry_run
while deleted_scope.where.not(:root_attachment_id => nil).limit(1000).delete_all > 0; end
while deleted_scope.limit(1000).delete_all > 0; end
deleted_scope.where.not(root_attachment_id: nil).in_batches.delete_all
deleted_scope.delete_all
end
private

View File

@ -63,7 +63,7 @@ class ExternalFeed < ActiveRecord::Base
}
def destroy_entries_and_unlink_topics
while self.external_feed_entries.limit(100).delete_all > 0; end
external_feed_entries.in_batches(of: 100).delete_all
while self.discussion_topics.limit(100).update_all(:external_feed_id => nil) > 0; end
end

View File

@ -27,8 +27,7 @@ class SisBatchError < ActiveRecord::Base
scope :warnings, -> {where(failure: false)}
def self.cleanup_old_errors
cleanup = expired_errors.limit(10_000)
while cleanup.delete_all > 0; end
expired_errors.in_batches(of: 10_000).delete_all
end
def description

View File

@ -763,31 +763,396 @@ module ActiveRecord
end
module UsefulFindInBatches
def find_in_batches(start: nil, strategy: nil, **kwargs, &block)
# prefer copy unless we're in a transaction (which would be bad,
# because we might open a separate connection in the block, and not
# see the contents of our current transaction)
if connection.open_transactions == 0 && !start && eager_load_values.empty? && !ActiveRecord::Base.in_migration && !strategy || strategy == :copy
self.activate { |r| r.find_in_batches_with_copy(**kwargs, &block); nil }
elsif strategy == :pluck_ids
self.activate { |r| r.find_in_batches_with_pluck_ids(**kwargs, &block); nil }
elsif should_use_cursor? && !start && eager_load_values.empty? && !strategy || strategy == :cursor
self.activate { |r| r.find_in_batches_with_cursor(**kwargs, &block); nil }
elsif find_in_batches_needs_temp_table? && !strategy || strategy == :temp_table
if start
raise ArgumentError.new("GROUP and ORDER are incompatible with :start, as is an explicit select without the primary key")
# add the strategy param
def find_each(start: nil, finish: nil, **kwargs)
if block_given?
find_in_batches(start: start, finish: finish, **kwargs) do |records|
records.each { |record| yield record }
end
unless eager_load_values.empty?
raise ArgumentError.new("GROUP and ORDER are incompatible with `eager_load`, as is an explicit select without the primary key")
end
self.activate { |r| r.find_in_batches_with_temp_table(**kwargs, &block); nil }
else
self.activate { |r| r.call_super(:find_in_batches, UsefulFindInBatches, start: start, **kwargs, &block); nil }
enum_for(:find_each, start: start, finish: finish, **kwargs) do
relation = self
apply_limits(relation, start, finish).size
end
end
end
# add the strategy param
def find_in_batches(batch_size: 1000, start: nil, finish: nil, **kwargs)
relation = self
unless block_given?
return to_enum(:find_in_batches, start: start, finish: finish, batch_size: batch_size, **kwargs) do
total = apply_limits(relation, start, finish).size
(total - 1).div(batch_size) + 1
end
end
in_batches(of: batch_size, start: start, finish: finish, load: true, **kwargs) do |batch|
yield batch.to_a
end
end
# back-compat for pluck
def find_in_batches_with_temp_table(batch_size: 1000, pluck: nil, **kwargs, &block)
if pluck
select(*pluck).in_batches(of: batch_size, strategy: :temp_table, **kwargs).pluck(*pluck, &block)
return
end
find_in_batches(strategy: :temp_table, batch_size: batch_size, **kwargs, &block)
end
def in_batches(strategy: nil, start: nil, finish: nil, **kwargs, &block)
unless block_given?
return ActiveRecord::Batches::BatchEnumerator.new(strategy: strategy, start: start, relation: self, **kwargs)
end
strategy ||= infer_in_batches_strategy
if strategy == :id
raise ArgumentError, "GROUP BY is incompatible with :id batches strategy" unless group_values.empty?
return activate { |r| r.call_super(:in_batches, UsefulFindInBatches, start: start, finish: finish, **kwargs, &block) }
end
kwargs.delete(:error_on_ignore)
activate { |r| r.send("in_batches_with_#{strategy}", start: start, finish: finish, **kwargs, &block); nil }
end
def infer_in_batches_strategy
strategy ||= :copy if in_batches_can_use_copy?
strategy ||= :cursor if in_batches_can_use_cursor?
strategy ||= :temp_table if in_batches_needs_temp_table?
strategy || :id
end
private
def in_batches_can_use_copy?
connection.open_transactions == 0 && eager_load_values.empty? && !ActiveRecord::Base.in_migration
end
def in_batches_can_use_cursor?
eager_load_values.empty? && (GuardRail.environment == :secondary || connection.readonly?)
end
def in_batches_needs_temp_table?
order_values.any? ||
group_values.any? ||
select_values.to_s =~ /DISTINCT/i ||
distinct_value ||
in_batches_select_values_necessitate_temp_table?
end
def in_batches_select_values_necessitate_temp_table?
return false if select_values.blank?
selects = select_values.flat_map { |sel| sel.to_s.split(",").map(&:strip) }
id_keys = [primary_key, "*", "#{table_name}.#{primary_key}", "#{table_name}.*"]
id_keys.all? { |k| !selects.include?(k) }
end
def in_batches_with_cursor(of: 1000, start: nil, finish: nil, load: false)
klass.transaction do
relation = apply_limits(clone, start, finish)
relation.skip_query_cache!
unless load
relation = relation.except(:select).select(primary_key)
end
sql = relation.to_sql
cursor = "#{table_name}_in_batches_cursor_#{sql.hash.abs.to_s(36)}"
connection.execute("DECLARE #{cursor} CURSOR FOR #{sql}")
loop do
if load
records = connection.uncached { klass.find_by_sql("FETCH FORWARD #{of} FROM #{cursor}") }
ids = records.map(&:id)
preload_associations(records)
yielded_relation = where(primary_key => ids).preload(includes_values + preload_values)
yielded_relation.send(:load_records, records)
else
ids = connection.uncached { connection.select_values("FETCH FORWARD #{of} FROM #{cursor}") }
yielded_relation = where(primary_key => ids).preload(includes_values + preload_values)
yielded_relation = yielded_relation.extending(BatchWithColumnsPreloaded).set_values(ids)
end
break if ids.empty?
yield yielded_relation
break if ids.size < of
end
ensure
unless $!.is_a?(ActiveRecord::StatementInvalid)
connection.execute("CLOSE #{cursor}")
end
end
end
def in_batches_with_copy(of: 1000, start: nil, finish: nil, load: false)
limited_query = limit(0).to_sql
relation = self
relation_for_copy = apply_limits(relation, start, finish)
unless load
relation_for_copy = relation_for_copy.except(:select).select(primary_key)
end
full_query = "COPY (#{relation_for_copy.to_sql}) TO STDOUT"
conn = connection
full_query = conn.annotate_sql(full_query) if defined?(Marginalia)
pool = conn.pool
# remove the connection from the pool so that any queries executed
# while we're running this will get a new connection
pool.remove(conn)
checkin = -> do
pool&.synchronize do
pool.send(:adopt_connection, conn)
pool.checkin(conn)
if (conn2 = klass.connection) != conn
# checkin the secondary connection that was established, so that the
# next usage will immediately checkout the original connection.
# this ensures original transaction state is preserved
pool.checkin(conn2)
# the pool uses a LIFO queue, so put the correct connection at the front
# by pretending to explicitly check it out and then return it
conn.lease
pool.checkin(conn)
end
klass.connection
end
pool = nil
end
# make sure to log _something_, even if the dbtime is totally off
conn.send(:log, full_query, "#{klass.name} Load") do
decoder = if load
# set up all our metadata based on a dummy query (COPY doesn't return any metadata)
result = conn.raw_connection.exec(limited_query)
type_map = conn.raw_connection.type_map_for_results.build_column_map(result)
# see PostgreSQLAdapter#exec_query
types = {}
fields = result.fields
fields.each_with_index do |fname, i|
ftype = result.ftype i
fmod = result.fmod i
types[fname] = conn.send(:get_oid_type, ftype, fmod, fname)
end
column_types = types.dup
columns_hash.each_key { |k| column_types.delete k }
PG::TextDecoder::CopyRow.new(type_map: type_map)
else
pkey_oid = columns_hash[primary_key].sql_type_metadata.oid
# this is really dumb that we have to manually search through this, but
# PG::TypeMapByOid doesn't have a direct lookup method
coder = conn.raw_connection.type_map_for_results.coders.find { |c| c.oid == pkey_oid }
PG::TextDecoder::CopyRow.new(type_map: PG::TypeMapByColumn.new([coder]))
end
rows = []
build_relation = -> do
if load
records = ActiveRecord::Result.new(fields, rows, types).map { |record| instantiate(record, column_types) }
ids = records.map(&:id)
yielded_relation = relation.where(primary_key => ids)
preload_associations(records)
yielded_relation.send(:load_records, records)
else
ids = rows.map(&:first)
yielded_relation = relation.where(primary_key => ids)
yielded_relation = yielded_relation.extending(BatchWithColumnsPreloaded).set_values(ids)
end
yielded_relation
end
conn.raw_connection.copy_data(full_query, decoder) do
while (row = conn.raw_connection.get_copy_data)
rows << row
if rows.size == of
yield build_relation.call
rows = []
end
end
end
# return the connection now, in case there was only 1 batch, we can avoid a separate connection if the block needs it
checkin.call
unless rows.empty?
yield build_relation.call
end
end
nil
ensure
# put the connection back in the pool for reuse
checkin&.call
end
# in some cases we're doing a lot of work inside
# the yielded block, and holding open a transaction
# or even a connection while we do all that work can
# be a problem for the database, especially if a lot
# of these are happening at once. This strategy
# makes one query to hold onto all the IDs needed for the
# iteration (make sure they'll fit in memory, or you could be sad)
# and yields the objects in batches in the same order as the scope specified
# so the DB connection can be fully recycled during each block.
def in_batches_with_pluck_ids(of: 1000, start: nil, finish: nil, load: false)
relation = apply_limits(self, start, finish)
all_object_ids = relation.pluck(:id)
current_order_values = order_values
all_object_ids.in_groups_of(of) do |id_batch|
object_batch = klass.unscoped.where(id: id_batch).order(current_order_values).preload(includes_values + preload_values)
yield object_batch
end
end
def in_batches_with_temp_table(of: 1000, start: nil, finish: nil, load: false, ignore_transaction: false)
Shard.current.database_server.unguard do
can_do_it = ignore_transaction ||
Rails.env.production? ||
ActiveRecord::Base.in_migration ||
GuardRail.environment == :deploy ||
(!Rails.env.test? && connection.open_transactions > 0) ||
ActiveRecord::Base.in_transaction_in_test?
unless can_do_it
raise ArgumentError, "in_batches with temp_table probably won't work outside a migration
and outside a transaction. Unfortunately, it's impossible to automatically
determine a better way to do it that will work correctly. You can try
switching to secondary first (then switching to primary if you modify anything
inside your loop), wrapping in a transaction (but be wary of locking records
for the duration of your query if you do any writes in your loop), or not
forcing in_batches to use a temp table (avoiding custom selects,
group, or order)."
end
relation = apply_limits(self, start, finish)
sql = relation.to_sql
table = "#{table_name}_in_batches_temp_table_#{sql.hash.abs.to_s(36)}"
table = table[-63..-1] if table.length > 63
remaining = connection.update("CREATE TEMPORARY TABLE #{table} AS #{sql}")
begin
return if remaining.zero?
if remaining > of
begin
old_proc = connection.raw_connection.set_notice_processor {}
index = if (select_values.empty? || select_values.any? { |v| v.to_s == primary_key.to_s }) && order_values.empty?
connection.execute(%{CREATE INDEX "temp_primary_key" ON #{connection.quote_local_table_name(table)}(#{connection.quote_column_name(primary_key)})})
primary_key.to_s
else
connection.execute "ALTER TABLE #{table} ADD temp_primary_key SERIAL PRIMARY KEY"
'temp_primary_key'
end
ensure
connection.raw_connection.set_notice_processor(&old_proc) if old_proc
end
end
klass.unscoped do
batch_relation = klass.from(table).select("*").limit(of).preload(includes_values + preload_values)
batch_relation = batch_relation.order(Arel.sql(connection.quote_column_name(index))) if index
yielded_relation = batch_relation
loop do
yield yielded_relation
remaining -= of
break if remaining <= 0
last_value = if yielded_relation.loaded?
yielded_relation.last[index]
else
yielded_relation.offset(of - 1).limit(1).pluck(index).first
end
break if last_value.nil?
yielded_relation = batch_relation.where("#{connection.quote_column_name(index)} > ?", last_value)
end
end
ensure
if !$!.is_a?(ActiveRecord::StatementInvalid) || connection.open_transactions == 0
connection.execute "DROP TABLE #{table}"
end
end
end
end
end
ActiveRecord::Relation.prepend(UsefulFindInBatches)
module UsefulBatchEnumerator
def initialize(strategy: nil, **kwargs)
@strategy = strategy
@kwargs = kwargs.except(:relation)
super(**kwargs.slice(:of, :start, :finish, :relation))
end
def each_record
return to_enum(:each_record) unless block_given?
@relation.to_enum(:in_batches, strategy: @strategy, load: true, **@kwargs).each do |relation|
relation.records.each { |record| yield record }
end
end
def delete_all
if @strategy.nil? && (strategy = @relation.infer_in_batches_strategy) == :id
sum = 0
loop do
current = @relation.limit(@of).delete_all
sum += current
break unless current == @of
end
return sum
end
@relation.in_batches(strategy: strategy, load: false, **@kwargs, &:delete_all)
end
def update_all(*args)
@relation.in_batches(strategy: @strategy, load: false, **@kwargs) do |relation|
relation.update_all(*args)
end
end
def destroy_all
@relation.in_batches(strategy: @strategy, load: true, **@kwargs, &:destroy_all)
end
def each
enum = @relation.to_enum(:in_batches, strategy: @strategy, load: true, **@kwargs)
return enum.each { |relation| yield relation } if block_given?
enum
end
def pluck(*args)
return to_enum(:pluck, *args) unless block_given?
@relation.in_batches(strategy: @strategy, load: false, **@kwargs) do |relation|
yield relation.pluck(*args)
end
end
end
ActiveRecord::Batches::BatchEnumerator.prepend(UsefulBatchEnumerator)
module BatchWithColumnsPreloaded
def set_values(values)
@loaded_values = values
self
end
def pluck(*args)
return @loaded_values if args == [primary_key.to_sym] && @loaded_values
super
end
end
module LockForNoKeyUpdate
def lock(lock_type = true)
lock_type = 'FOR NO KEY UPDATE' if lock_type == :no_key_update
@ -799,263 +1164,20 @@ ActiveRecord::Relation.prepend(LockForNoKeyUpdate)
ActiveRecord::Relation.class_eval do
def includes(*args)
return super if args.empty? || args == [nil]
raise "Use preload or eager_load instead of includes"
end
def where!(*args)
raise "where!.not doesn't work in Rails 4.2" if args.empty?
super
end
def uniq(*args)
def uniq(*)
raise "use #distinct instead of #uniq on relations (Rails 5.1 will delegate uniq to to_a)"
end
def select_values_necessitate_temp_table?
return false unless select_values.present?
selects = select_values.flat_map{|sel| sel.to_s.split(",").map(&:strip) }
id_keys = [primary_key, "*", "#{table_name}.#{primary_key}", "#{table_name}.*"]
id_keys.all?{|k| !selects.include?(k) }
end
private :select_values_necessitate_temp_table?
def find_in_batches_needs_temp_table?
order_values.any? ||
group_values.any? ||
select_values.to_s =~ /DISTINCT/i ||
distinct_value ||
select_values_necessitate_temp_table?
end
private :find_in_batches_needs_temp_table?
def should_use_cursor?
(GuardRail.environment == :secondary || connection.readonly?)
end
def find_in_batches_with_cursor(options = {})
batch_size = options[:batch_size] || 1000
klass.transaction do
begin
sql = to_sql
cursor = "#{table_name}_in_batches_cursor_#{sql.hash.abs.to_s(36)}"
connection.execute("DECLARE #{cursor} CURSOR FOR #{sql}")
includes = includes_values + preload_values
klass.unscoped do
batch = connection.uncached { klass.find_by_sql("FETCH FORWARD #{batch_size} FROM #{cursor}") }
while !batch.empty?
ActiveRecord::Associations::Preloader.new.preload(batch, includes) if includes
yield batch
break if batch.size < batch_size
batch = connection.uncached { klass.find_by_sql("FETCH FORWARD #{batch_size} FROM #{cursor}") }
end
end
ensure
unless $!.is_a?(ActiveRecord::StatementInvalid)
connection.execute("CLOSE #{cursor}")
end
end
end
end
def find_in_batches_with_copy(options = {})
# implement the start option as an offset
return offset(options[:start]).find_in_batches_with_copy(options.merge(start: 0)) if options[:start].to_i != 0
limited_query = limit(0).to_sql
full_query = "COPY (#{to_sql}) TO STDOUT"
conn = connection
full_query = conn.annotate_sql(full_query) if defined?(Marginalia)
pool = conn.pool
# remove the connection from the pool so that any queries executed
# while we're running this will get a new connection
pool.remove(conn)
# make sure to log _something_, even if the dbtime is totally off
conn.send(:log, full_query, "#{klass.name} Load") do
# set up all our metadata based on a dummy query (COPY doesn't return any metadata)
result = conn.raw_connection.exec(limited_query)
type_map = conn.raw_connection.type_map_for_results.build_column_map(result)
deco = PG::TextDecoder::CopyRow.new(type_map: type_map)
# see PostgreSQLAdapter#exec_query
types = {}
fields = result.fields
fields.each_with_index do |fname, i|
ftype = result.ftype i
fmod = result.fmod i
types[fname] = conn.send(:get_oid_type, ftype, fmod, fname)
end
column_types = types.dup
columns_hash.each_key { |k| column_types.delete k }
includes = includes_values + preload_values
rows = []
batch_size = options[:batch_size] || 1000
conn.raw_connection.copy_data(full_query, deco) do
while (row = conn.raw_connection.get_copy_data)
rows << row
if rows.size == batch_size
batch = ActiveRecord::Result.new(fields, rows, types).map { |record| instantiate(record, column_types) }
ActiveRecord::Associations::Preloader.new.preload(batch, includes) if includes
yield batch
rows = []
end
end
end
# return the connection now, in case there was only 1 batch, we can avoid a separate connection if the block needs it
pool.synchronize do
pool.send(:adopt_connection, conn)
pool.checkin(conn)
end
pool = nil
unless rows.empty?
batch = ActiveRecord::Result.new(fields, rows, types).map { |record| instantiate(record, column_types) }
ActiveRecord::Associations::Preloader.new.preload(batch, includes) if includes
yield batch
end
end
nil
ensure
if pool
# put the connection back in the pool for reuse
pool.synchronize do
pool.send(:adopt_connection, conn)
pool.checkin(conn)
end
end
end
# in some cases we're doing a lot of work inside
# the yielded block, and holding open a transaction
# or even a connection while we do all that work can
# be a problem for the database, especially if a lot
# of these are happening at once. This strategy
# makes one query to hold onto all the IDs needed for the
# iteration (make sure they'll fit in memory, or you could be sad)
# and yields the objects in batches in the same order as the scope specified
# so the DB connection can be fully recycled during each block.
def find_in_batches_with_pluck_ids(options = {})
batch_size = options[:batch_size] || 1000
all_object_ids = pluck(:id)
current_order_values = order_values
all_object_ids.in_groups_of(batch_size) do |id_batch|
object_batch = klass.unscoped.where(id: id_batch).order(current_order_values)
yield object_batch
end
end
def find_in_batches_with_temp_table(options = {})
Shard.current.database_server.unguard do
can_do_it = Rails.env.production? ||
ActiveRecord::Base.in_migration ||
GuardRail.environment == :deploy ||
(!Rails.env.test? && connection.open_transactions > 0) ||
ActiveRecord::Base.in_transaction_in_test?
raise "find_in_batches_with_temp_table probably won't work outside a migration
and outside a transaction. Unfortunately, it's impossible to automatically
determine a better way to do it that will work correctly. You can try
switching to secondary first (then switching to primary if you modify anything
inside your loop), wrapping in a transaction (but be wary of locking records
for the duration of your query if you do any writes in your loop), or not
forcing find_in_batches to use a temp table (avoiding custom selects,
group, or order)." unless can_do_it
if options[:pluck]
pluck = Array(options[:pluck])
pluck_for_select = pluck.map do |column_name|
if column_name.is_a?(Symbol) && column_names.include?(column_name.to_s)
"#{connection.quote_local_table_name(table_name)}.#{connection.quote_column_name(column_name)}"
else
column_name.to_s
end
end
pluck = pluck.map(&:to_s)
end
batch_size = options[:batch_size] || 1000
if pluck
sql = select(pluck_for_select).to_sql
else
sql = to_sql
end
table = "#{table_name}_find_in_batches_temp_table_#{sql.hash.abs.to_s(36)}"
table = table[-63..-1] if table.length > 63
rows = connection.update("CREATE TEMPORARY TABLE #{table} AS #{sql}")
begin
if (rows > batch_size)
index = "temp_primary_key"
begin
old_proc = connection.raw_connection.set_notice_processor {}
if pluck && pluck.any?{|p| p == primary_key.to_s}
connection.execute("CREATE INDEX #{connection.quote_local_table_name(index)} ON #{connection.quote_local_table_name(table)}(#{connection.quote_column_name(primary_key)})")
index = primary_key.to_s
else
pluck.unshift(index) if pluck
connection.execute "ALTER TABLE #{table}
ADD temp_primary_key SERIAL PRIMARY KEY"
end
ensure
connection.raw_connection.set_notice_processor(&old_proc) if old_proc
end
end
includes = includes_values + preload_values
klass.unscoped do
quoted_plucks = pluck && pluck.map do |column_name|
# Rails 4.2 is going to try to quote them anyway but unfortunately not to the temp table, so just make it explicit
column_names.include?(column_name) ?
Arel.sql("#{connection.quote_local_table_name(table)}.#{connection.quote_column_name(column_name)}") : column_name
end
if pluck
if index
batch = klass.from(table).order(Arel.sql(index)).limit(batch_size).pluck(*quoted_plucks)
else
batch = klass.from(table).pluck(*quoted_plucks)
end
else
if index
sql = "SELECT * FROM #{table} ORDER BY #{index} LIMIT #{batch_size}"
batch = klass.find_by_sql(sql)
else
batch = klass.find_by_sql("SELECT * FROM #{table}")
end
end
while rows > 0
rows -= batch.size
ActiveRecord::Associations::Preloader.new.preload(batch, includes) if includes
yield batch
break if rows <= 0 || batch.size < batch_size
if pluck
last_value = pluck.length == 1 ? batch.last : batch.last[pluck.index(index)]
batch = klass.from(table).order(Arel.sql(index)).where("#{index} > ?", last_value).limit(batch_size).pluck(*quoted_plucks)
else
last_value = batch.last[index]
sql = "SELECT *
FROM #{table}
WHERE #{index} > #{last_value}
ORDER BY #{index} ASC
LIMIT #{batch_size}"
batch = klass.find_by_sql(sql)
end
end
end
ensure
if !$!.is_a?(ActiveRecord::StatementInvalid) || connection.open_transactions == 0
connection.execute "DROP TABLE #{table}"
end
end
end
end
def polymorphic_where(args)
raise ArgumentError unless args.length == 1

View File

@ -494,7 +494,7 @@ module AccountReports
# rather than a cursor for this iteration
# because it often is big enough that the secondary
# kills it mid-run (http://www.postgresql.org/docs/9.0/static/hot-standby.html)
enrol.preload(:root_account, :sis_pseudonym, :role).find_in_batches(start: 0) do |batch|
enrol.preload(:root_account, :sis_pseudonym, :role).find_in_batches(strategy: :id) do |batch|
users = batch.map {|e| User.new(id: e.user_id)}.compact
users += batch.map {|e| User.new(id: e.associated_user_id) unless e.associated_user_id.nil?}.compact
users.uniq!

View File

@ -35,7 +35,7 @@ module DataFixup::DeleteExtraPlaceholderSubmissions
Course.where(:id => assignment_ids_by_course_id.keys).to_a.each do |course|
course_assignment_ids = assignment_ids_by_course_id[course.id]
StudentEnrollment.where(course: course).in_batches do |relation|
StudentEnrollment.where(course: course).select(:user_id).in_batches(strategy: :cursor) do |relation|
batch_student_ids = relation.pluck(:user_id)
edd = EffectiveDueDates.for_course(course).filter_students_to(batch_student_ids)
course_assignment_ids.each do |assignment_id|

View File

@ -20,6 +20,6 @@
module DataFixup::DeleteScoresForAssignmentGroups
def self.run
Score.where.not(assignment_group_id: nil).in_batches { |scores| scores.delete_all }
Score.where.not(assignment_group_id: nil).in_batches.delete_all
end
end

View File

@ -22,8 +22,8 @@ module DataFixup::Lti::FillCustomClaimColumnsForResourceLink
update_context!
drop_resource_links_without_a_context
Lti::ResourceLink.in_batches do |resource_links|
update_lookup_id!(resource_links)
Lti::ResourceLink.find_each do |resource_link|
resource_link.update!(lookup_id: SecureRandom.uuid)
end
end
@ -47,10 +47,4 @@ module DataFixup::Lti::FillCustomClaimColumnsForResourceLink
joins("INNER JOIN #{Assignment.quoted_table_name} ON assignments.lti_context_id = lti_resource_links.resource_link_id").
update_all("context_type = 'Assignment', context_id = assignments.id")
end
def self.update_lookup_id!(resource_links)
resource_links.each do |resource_link|
resource_link.update!(lookup_id: SecureRandom.uuid)
end
end
end

View File

@ -19,8 +19,8 @@
module DataFixup::Lti::FillLookupUuidAndResourceLinkUuidColumns
def self.run
resource_links_to_update.in_batches do |resource_links|
update_columns!(resource_links)
resource_links_to_update.find_each do |resource_link|
update_columns!(resource_link)
end
end
@ -36,25 +36,23 @@ module DataFixup::Lti::FillLookupUuidAndResourceLinkUuidColumns
#
# So, we decide that will be better to follow the second approach by
# re-generate the UUID and set it to the old and the new column for consistency.
def self.update_columns!(resource_links)
resource_links.each do |resource_link|
options = {
lookup_uuid: resource_link.lookup_id,
resource_link_uuid: resource_link.resource_link_id
}
def self.update_columns!(resource_link)
options = {
lookup_uuid: resource_link.lookup_id,
resource_link_uuid: resource_link.resource_link_id
}
unless UuidHelper.valid_format?(resource_link.lookup_id)
Rails.logger.info("[#{name}] generating a new lookup_id for id: #{resource_link.id}, lookup_id: #{resource_link.lookup_id}")
options[:lookup_id] = options[:lookup_uuid] = SecureRandom.uuid
end
unless UuidHelper.valid_format?(resource_link.resource_link_id)
Rails.logger.info("[#{name}] generating a new resource_link_id for id: #{resource_link.id}, resource_link_id: #{resource_link.resource_link_id}")
options[:resource_link_id] = options[:resource_link_uuid] = SecureRandom.uuid
end
resource_link.update!(options)
unless UuidHelper.valid_format?(resource_link.lookup_id)
Rails.logger.info("[#{name}] generating a new lookup_id for id: #{resource_link.id}, lookup_id: #{resource_link.lookup_id}")
options[:lookup_id] = options[:lookup_uuid] = SecureRandom.uuid
end
unless UuidHelper.valid_format?(resource_link.resource_link_id)
Rails.logger.info("[#{name}] generating a new resource_link_id for id: #{resource_link.id}, resource_link_id: #{resource_link.resource_link_id}")
options[:resource_link_id] = options[:resource_link_uuid] = SecureRandom.uuid
end
resource_link.update!(options)
end
def self.resource_links_to_update

View File

@ -25,9 +25,8 @@ module DataFixup::MigrateMessagesToPartitions
# don't re-run the deletion if we don't need to
unless last_run_date_threshold && last_run_date_threshold >= min_date_threshold
# remove all messages that would be inserted into a dropped partition
while Message.from("ONLY #{Message.quoted_table_name}").
where("created_at < ?", min_date_threshold).limit(1000).delete_all > 0
end
Message.from("ONLY #{Message.quoted_table_name}")
.where("created_at < ?", min_date_threshold).in_batches.delete_all
end
partman = CanvasPartman::PartitionManager.create(Message)

View File

@ -20,8 +20,6 @@
module DataFixup::PopulateScoresCourseScore
def self.run
Score.where(course_score: nil).in_batches do |scores|
scores.update_all("course_score = (grading_period_id IS NULL)")
end
Score.where(course_score: nil).in_batches.update_all("course_score = (grading_period_id IS NULL)")
end
end

View File

@ -26,7 +26,7 @@ module DataFixup
id: start_id..end_id,
anonymous_grading: true,
submission_types: ['discussion_topic', 'online_quiz']
).in_batches { |batch| batch.update_all(anonymous_grading: false, updated_at: Time.zone.now) }
).in_batches.update_all(anonymous_grading: false, updated_at: Time.zone.now)
end
end
end

View File

@ -24,7 +24,7 @@ module DataFixup::UpdateAnonymousGradingSettings
where("courses.id >= ? AND courses.id <= ?", start_at, end_at).
where(feature_flags: {feature: 'anonymous_grading', state: 'on'})
courses_to_disable.find_each(start: 0) do |course|
courses_to_disable.find_each(strategy: :id) do |course|
course.assignments.except(:order).
where.not(anonymous_grading: true).
in_batches.update_all(anonymous_grading: true)
@ -42,7 +42,7 @@ module DataFixup::UpdateAnonymousGradingSettings
where("accounts.id >= ? AND accounts.id <= ?", start_at, end_at).
where(feature_flags: {feature: 'anonymous_grading', state: 'on'})
accounts_to_disable.find_each(start: 0) do |account|
accounts_to_disable.find_each(strategy: :id) do |account|
# If an account has the feature flag forced to ON, we need to get all
# the courses belonging to that account and every account below it.
# That said, we don't actually need do any work on said courses (since

View File

@ -240,7 +240,7 @@ class Feature
valid_features = self.definitions.keys
cutoff = Setting.get('obsolete_feature_flag_cutoff_days', 60).to_i.days.ago
delete_scope = FeatureFlag.where('updated_at<?', cutoff).where.not(feature: valid_features)
while delete_scope.limit(1000).delete_all > 0; end
delete_scope.in_batches.delete_all
end
end

View File

@ -124,22 +124,22 @@ module ActiveRecord
expect do
User.create!
User.select(:name).find_in_batches do |batch|
User.connection.select_value("SELECT COUNT(*) FROM users_find_in_batches_temp_table_#{User.select(:name).to_sql.hash.abs.to_s(36)}")
User.connection.select_value("SELECT COUNT(*) FROM users_in_batches_temp_table_#{User.select(:name).to_sql.hash.abs.to_s(36)}")
end
end.to_not raise_error
end
it "should not use a temp table for a plain query" do
User.create!
User.find_in_batches do |batch|
expect { User.connection.select_value("SELECT COUNT(*) FROM users_find_in_batches_temp_table_#{User.all.to_sql.hash.abs.to_s(36)}") }.to raise_error(ActiveRecord::StatementInvalid)
User.find_in_batches do
expect { User.connection.select_value("SELECT COUNT(*) FROM users_in_batches_temp_table_#{User.all.to_sql.hash.abs.to_s(36)}") }.to raise_error(ActiveRecord::StatementInvalid)
end
end
it "should not use a temp table for a select with id" do
User.create!
User.select(:id).find_in_batches do |batch|
expect { User.connection.select_value("SELECT COUNT(*) FROM users_find_in_batches_temp_table_#{User.select(:id).to_sql.hash.abs.to_s(36)}") }.to raise_error(ActiveRecord::StatementInvalid)
User.select(:id).find_in_batches do
expect { User.connection.select_value("SELECT COUNT(*) FROM users_in_batches_temp_table_#{User.select(:id).to_sql.hash.abs.to_s(36)}") }.to raise_error(ActiveRecord::StatementInvalid)
end
end
@ -148,7 +148,7 @@ module ActiveRecord
User.create!
selectors.each do |selector|
expect {
User.select(selector).find_in_batches(start: 0){|batch| }
User.select(selector).find_in_batches(strategy: :id) {}
}.not_to raise_error
end
end

View File

@ -69,114 +69,164 @@ describe ActiveRecord::Base do
end
end
describe "find in batches" do
describe "in batches" do
before :once do
@c1 = course_factory(:name => 'course1', :active_course => true)
@c2 = course_factory(:name => 'course2', :active_course => true)
u1 = user_factory(:name => 'user1', :active_user => true)
u2 = user_factory(:name => 'user2', :active_user => true)
u3 = user_factory(:name => 'user3', :active_user => true)
@e1 = @c1.enroll_student(u1, :enrollment_state => 'active')
@e2 = @c1.enroll_student(u2, :enrollment_state => 'active')
@e3 = @c1.enroll_student(u3, :enrollment_state => 'active')
@e4 = @c2.enroll_student(u1, :enrollment_state => 'active')
@e5 = @c2.enroll_student(u2, :enrollment_state => 'active')
@e6 = @c2.enroll_student(u3, :enrollment_state => 'active')
@u1 = user_factory(name: 'userC', active_user: true)
@u2 = user_factory(name: 'userB', active_user: true)
@u3 = user_factory(name: 'userA', active_user: true)
@e1 = @c1.enroll_student(@u1, :enrollment_state => 'active')
@e2 = @c1.enroll_student(@u2, :enrollment_state => 'active')
@e3 = @c1.enroll_student(@u3, :enrollment_state => 'active')
@e4 = @c2.enroll_student(@u1, :enrollment_state => 'active')
@e5 = @c2.enroll_student(@u2, :enrollment_state => 'active')
@e6 = @c2.enroll_student(@u3, :enrollment_state => 'active')
end
it "should raise an error when not in a transaction" do
expect { User.all.find_in_batches_with_temp_table }.to raise_error /find_in_batches_with_temp_table probably won't work/
shared_examples_for "batches" do
def do_batches(relation, **kwargs)
result = []
extra = defined?(extra_kwargs) ? extra_kwargs : {}
relation.in_batches(**kwargs.reverse_merge(extra).reverse_merge(strategy: strategy)) do |batch|
result << (block_given? ? (yield batch) : batch.to_a)
end
result
end
it "supports start" do
expect(do_batches(Enrollment, start: @e2.id)).to eq [[@e2, @e3, @e4, @e5, @e6]]
end
it "supports finish" do
expect(do_batches(Enrollment, finish: @e3.id)).to eq [[@e1, @e2, @e3]]
end
it "supports start and finish with a small batch size" do
expect(do_batches(Enrollment, of: 2, start: @e2.id, finish: @e4.id)).to eq [[@e2, @e3], [@e4]]
end
it "respects order" do
expect(do_batches(User.order(:name), of: 2)).to eq [[@u3, @u2], [@u1]]
end
it "handles a batch size the exact size of the query" do
expect(do_batches(User.order(:id), of: 3)).to eq [[@u1, @u2, @u3]]
end
it "preloads" do
Account.default.courses.create!
a = do_batches(Account.where(id: Account.default).preload(:courses)).flatten.first
expect(a.courses.loaded?).to eq true
end
it "handles pluck" do
expect do
expect(do_batches(User.order(:id), of: 3, load: false) { |r| r.pluck(:id) }).to eq [[@u1.id, @u2.id, @u3.id]]
end.not_to change(User.connection_pool.connections, :length)
# even with :copy, a new connection should not be taken out (i.e. to satisfy an "actual" query for the pluck)
end
end
it "should find all enrollments from course join in batches" do
e = Course.active.where(id: [@c1, @c2]).select("enrollments.id AS e_id").
joins(:enrollments).order("e_id asc")
batch_size = 2
es = []
Course.transaction do
e.find_in_batches_with_temp_table(:batch_size => batch_size) do |batch|
expect(batch.size).to eq batch_size
batch.each do |r|
es << r["e_id"].to_i
context "with temp_table" do
let(:strategy) { :temp_table }
let(:extra_kwargs) { { ignore_transaction: true } }
include_examples "batches"
it "raises an error when not in a transaction" do
expect { User.all.find_in_batches(strategy: :temp_table) {} }.to raise_error(ArgumentError)
end
it "finds all enrollments from course join" do
e = Course.active.where(id: [@c1, @c2]).select("enrollments.id AS e_id")
.joins(:enrollments).order("e_id asc")
batch_size = 2
es = []
Course.transaction do
e.find_in_batches(strategy: :temp_table, batch_size: batch_size) do |batch|
expect(batch.size).to eq batch_size
batch.each do |r|
es << r["e_id"].to_i
end
end
end
expect(es.length).to eq 6
expect(es).to eq [@e1.id,@e2.id,@e3.id,@e4.id,@e5.id,@e6.id]
end
it "plucks" do
scope = Course.where(id: [@c1, @c2])
cs = []
Course.transaction do
scope.find_in_batches_with_temp_table(batch_size: 1, pluck: :id) do |batch|
cs.concat(batch)
end
end
expect(cs.sort).to eq [@c1.id, @c2.id].sort
end
it "multi-column plucks" do
scope = Course.where(id: [@c1, @c2])
cs = []
Course.transaction do
scope.find_in_batches_with_temp_table(batch_size: 1, pluck: [:id, :name]) do |batch|
cs.concat(batch)
end
end
expect(cs.sort).to eq [[@c1.id, @c1.name], [@c2.id, @c2.name]].sort
end
it "plucks with join" do
scope = Enrollment.joins(:course).where(courses: { id: [@c1, @c2] })
es = []
Course.transaction do
scope.find_in_batches_with_temp_table(batch_size: 2, pluck: :id) do |batch|
es.concat(batch)
end
end
expect(es.sort).to eq [@e1.id, @e2.id, @e3.id, @e4.id, @e5.id, @e6.id].sort
end
end
context "with cursor" do
let(:strategy) { :cursor }
include_examples "batches"
context "sharding" do
specs_require_sharding
it "properly transposes across multiple shards" do
u1 = User.create!
u2 = @shard1.activate { User.create! }
User.transaction do
users = []
User.preload(:pseudonyms).where(id: [u1, u2]).find_each(strategy: :cursor) do |u|
users << u
end
expect(users.sort).to eq [u1, u2].sort
end
end
end
expect(es.length).to eq 6
expect(es).to eq [@e1.id,@e2.id,@e3.id,@e4.id,@e5.id,@e6.id]
end
it "should pluck" do
scope = Course.where(id: [@c1, @c2])
cs = []
Course.transaction do
scope.find_in_batches_with_temp_table(batch_size: 1, pluck: :id) do |batch|
cs.concat(batch)
end
end
expect(cs.sort).to eq [@c1.id, @c2.id].sort
end
context "with copy" do
let(:strategy) { :copy }
let(:extra_kwargs) { { load: true } }
it "should multi-column pluck" do
scope = Course.where(id: [@c1, @c2])
cs = []
Course.transaction do
scope.find_in_batches_with_temp_table(batch_size: 1, pluck: [:id, :name]) do |batch|
cs.concat(batch)
end
end
expect(cs.sort).to eq [[@c1.id, @c1.name], [@c2.id, @c2.name]].sort
end
include_examples "batches"
it "should pluck with join" do
scope = Enrollment.joins(:course).where(courses: { id: [@c1, @c2] })
es = []
Course.transaction do
scope.find_in_batches_with_temp_table(batch_size: 2, pluck: :id) do |batch|
es.concat(batch)
end
end
expect(es.sort).to eq [@e1.id, @e2.id, @e3.id, @e4.id, @e5.id, @e6.id].sort
end
it "should honor preload when using a cursor" do
skip "needs PostgreSQL" unless Account.connection.adapter_name == 'PostgreSQL'
Account.default.courses.create!
Account.transaction do
Account.where(:id => Account.default).preload(:courses).find_each do |a|
expect(a.courses.loaded?).to be_truthy
end
it "works with load: false" do
User.in_batches(strategy: :copy) { |r| expect(r.to_a).to eq [@u1, @u2, @u3] }
end
end
it "should not use a cursor when start is passed" do
skip "needs PostgreSQL" unless Account.connection.adapter_name == 'PostgreSQL'
Account.transaction do
expect(Account).to receive(:find_in_batches_with_cursor).never
Account.where(:id => Account.default).find_each(start: 0) do
end
end
end
it "should raise an error when start is used with group" do
expect {
Account.group(:id).find_each(start: 0) do
end
}.to raise_error(ArgumentError)
end
context "sharding" do
specs_require_sharding
it "properly transposes a cursor query across multiple shards" do
u1 = User.create!
u2 = @shard1.activate { User.create! }
User.transaction do
users = []
User.preload(:pseudonyms).where(id: [u1, u2]).find_each do |u|
users << u
end
expect(users.sort).to eq [u1, u2].sort
end
context "with id" do
it "should raise an error when start is used with group" do
expect do
Account.group(:id).find_each(strategy: :id, start: 0) {}
end.to raise_error(ArgumentError)
end
end
end
@ -571,6 +621,15 @@ describe ActiveRecord::Base do
end
end
describe "#in_batches.delete_all" do
it "just does a single query, instead of an ordered select and then delete" do
u = User.create!
expect(User.connection).to receive(:exec_query).once.and_call_original
expect(User.where(id: u.id).in_batches.delete_all).to eq 1
expect { User.find(u.id) }.to raise_error(ActiveRecord::RecordNotFound)
end
end
describe "add_index" do
it "should raise an error on too long of name" do
name = 'some_really_long_name_' * 10