yield for each batch_size in for fixing audit logs
currently, we try and get batch_size based just on the last key seen. but then we have to make sure we get all records for that key, iterating on last ordered_id seen for that key. this is correct for covering the keyspace completely, but some keys may have several batches worth of ordered_ids. yield for each iteration of the second tier iteration as well. Change-Id: Idfc23ef650de63d6afd7a1787e9061a558afa892 Reviewed-on: https://gerrit.instructure.com/38661 Reviewed-by: Nick Cloward <ncloward@instructure.com> Tested-by: Jenkins <jenkins@instructure.com> Product-Review: Jacob Fugal <jacob@instructure.com> QA-Review: Jacob Fugal <jacob@instructure.com>
This commit is contained in:
parent
56f6ac3be0
commit
09d9619f49
|
@ -94,27 +94,29 @@ module DataFixup
|
|||
}
|
||||
|
||||
loop do
|
||||
rows = []
|
||||
|
||||
database.execute(cql, last_seen_key, batch_size, consistency: index.event_stream.read_consistency_level).fetch do |row|
|
||||
row = row.to_hash
|
||||
last_seen_key = row[index.key_column]
|
||||
last_seen_ordered_id = row['ordered_id']
|
||||
rows << row
|
||||
if last_seen_ordered_id == ''
|
||||
rows = []
|
||||
database.execute(cql, last_seen_key, batch_size, consistency: index.event_stream.read_consistency_level).fetch do |row|
|
||||
row = row.to_hash
|
||||
last_seen_key = row[index.key_column]
|
||||
last_seen_ordered_id = row['ordered_id']
|
||||
rows << row
|
||||
end
|
||||
break if rows.empty?
|
||||
yield rows, last_seen_key, last_seen_ordered_id
|
||||
end
|
||||
|
||||
# Sort of lame but we need to get the rest of the rows if the limit exculded them.
|
||||
last_seen_key, last_seen_ordered_id = get_ordered_id_rows(index, last_seen_key, last_seen_ordered_id) do |ordered_id_rows|
|
||||
rows.concat(ordered_id_rows)
|
||||
get_ordered_id_rows(index, last_seen_key, last_seen_ordered_id) do |rows, last_seen_ordered_id|
|
||||
yield rows, last_seen_key, last_seen_ordered_id
|
||||
end
|
||||
|
||||
break if rows.empty?
|
||||
yield rows, last_seen_key, last_seen_ordered_id
|
||||
last_seen_ordered_id = ''
|
||||
end
|
||||
end
|
||||
|
||||
def get_ordered_id_rows(index, last_seen_key, last_seen_ordered_id)
|
||||
return [last_seen_key, last_seen_ordered_id] if last_seen_key.blank?
|
||||
return if last_seen_key.blank?
|
||||
|
||||
cql = %{
|
||||
SELECT #{index.id_column},
|
||||
|
@ -131,16 +133,13 @@ module DataFixup
|
|||
|
||||
database.execute(cql, last_seen_key, last_seen_ordered_id, batch_size, consistency: index.event_stream.read_consistency_level).fetch do |row|
|
||||
row = row.to_hash
|
||||
last_seen_key = row[index.key_column]
|
||||
last_seen_ordered_id = row['ordered_id']
|
||||
rows << row
|
||||
end
|
||||
|
||||
break if rows.empty?
|
||||
yield rows
|
||||
yield rows, last_seen_ordered_id
|
||||
end
|
||||
|
||||
return [last_seen_key, last_seen_ordered_id]
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue