cassandra batching, and batch up the page view updates
Add a batch method that uses cassandra's BEGIN BATCH/APPLY BATCH functionality to cut down on the number of network round trips, and possibly save a bit of CPU time on the cassandra servers. Since execute returns a result set, it can't be batched, so we add another method update for INSERT/UPDATE CQL statements that don't return a value. Closes CNVS-3526 test plan: No behavioral change. Do a regression test that page view information is still send to cassandra properly. You can check the debug logs to see that the CQL statements are all sent in one big BEGIN BATCH statement. Change-Id: Ibca4f6fbd84f2644436599c017f1ec8c39783e36 Reviewed-on: https://gerrit.instructure.com/17156 Tested-by: Jenkins <jenkins@instructure.com> Reviewed-by: Cody Cutrer <cody@instructure.com> QA-Review: Clare Hetherington <clare@instructure.com>
This commit is contained in:
parent
cfac26ea29
commit
dd574808c2
|
@ -168,20 +168,37 @@ class PageView < ActiveRecord::Base
|
|||
end
|
||||
|
||||
def create_without_callbacks
|
||||
return super unless self.page_view_method == :cassandra
|
||||
self.created_at ||= Time.zone.now
|
||||
update
|
||||
if user
|
||||
cassandra.execute("INSERT INTO page_views_history_by_context (context_and_time_bucket, ordered_id, request_id) VALUES (?, ?, ?)", "#{user.global_asset_string}/#{PageView.timeline_bucket_for_time(created_at, "User")}", "#{created_at.to_i}/#{request_id[0,8]}", request_id)
|
||||
if self.page_view_method == :cassandra
|
||||
cassandra.batch do
|
||||
update_cassandra
|
||||
end
|
||||
@new_record = false
|
||||
self.id
|
||||
else
|
||||
super
|
||||
end
|
||||
@new_record = false
|
||||
self.id
|
||||
end
|
||||
|
||||
def update_without_callbacks
|
||||
return super unless self.page_view_method == :cassandra
|
||||
if self.page_view_method == :cassandra
|
||||
cassandra.batch do
|
||||
update_cassandra
|
||||
end
|
||||
true
|
||||
else
|
||||
super
|
||||
end
|
||||
end
|
||||
|
||||
def update_cassandra
|
||||
self.created_at ||= Time.zone.now
|
||||
cassandra.update_record("page_views", { :request_id => request_id }, self.changes)
|
||||
true
|
||||
|
||||
if new_record?
|
||||
if user
|
||||
cassandra.update("INSERT INTO page_views_history_by_context (context_and_time_bucket, ordered_id, request_id) VALUES (?, ?, ?)", "#{user.global_asset_string}/#{PageView.timeline_bucket_for_time(created_at, "User")}", "#{created_at.to_i}/#{request_id[0,8]}", request_id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
named_scope :for_context, proc { |ctx| { :conditions => { :context_type => ctx.class.name, :context_id => ctx.id } } }
|
||||
|
|
|
@ -75,7 +75,9 @@ if PageView.redis_queue?
|
|||
Delayed::Periodic.cron 'PageView.process_cache_queue', '*/1 * * * *' do
|
||||
Shard.with_each_shard do
|
||||
unless Shard.current.settings[:process_page_view_queue] == false
|
||||
PageView.send_later_enqueue_args(:process_cache_queue, :singleton => "PageView.process_cache_queue:#{Shard.current.id}")
|
||||
PageView.send_later_enqueue_args(:process_cache_queue,
|
||||
:singleton => "PageView.process_cache_queue:#{Shard.current.id}",
|
||||
:max_attempts => 1)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -66,6 +66,76 @@ module Canvas::Cassandra
|
|||
result
|
||||
end
|
||||
|
||||
# private Struct used to store batch information
|
||||
class Batch < Struct.new(:statements, :args)
|
||||
def initialize
|
||||
self.statements = []
|
||||
self.args = []
|
||||
end
|
||||
delegate :empty?, :present?, :blank?, :to => :statements
|
||||
|
||||
def to_cql_ary
|
||||
case statements.size
|
||||
when 0
|
||||
raise "Cannot execute an empty batch"
|
||||
when 1
|
||||
statements + args
|
||||
else
|
||||
# http://www.datastax.com/docs/1.1/references/cql/BATCH
|
||||
# note there's no semicolons between statements in the batch
|
||||
cql = []
|
||||
cql << "BEGIN BATCH"
|
||||
cql.concat statements
|
||||
cql << "APPLY BATCH"
|
||||
# join with spaces rather than newlines, because cassandra doesn't care
|
||||
# and syslog doesn't like newlines
|
||||
[cql.join(" ")] + args
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Update, insert or delete from cassandra. The only difference between this
|
||||
# and execute above is that this doesn't return a result set, so it can be
|
||||
# batched up.
|
||||
def update(query, *args)
|
||||
if in_batch?
|
||||
@batch.statements << query
|
||||
@batch.args.concat args
|
||||
else
|
||||
execute(query, *args)
|
||||
end
|
||||
nil
|
||||
end
|
||||
|
||||
# Batch up all execute statements inside the given block, executing when
|
||||
# the block returns successfully.
|
||||
# Note that this only batches up update calls, not execute calls, since
|
||||
# execute expects to return results immediately.
|
||||
# If this method is called again while already in a batch, the same batch
|
||||
# will be re-used and changes won't be executed until the outer batch
|
||||
# returns.
|
||||
# (It may be useful to add a force_new option later)
|
||||
def batch
|
||||
if in_batch?
|
||||
yield
|
||||
else
|
||||
begin
|
||||
@batch = Batch.new
|
||||
yield
|
||||
unless @batch.empty?
|
||||
execute(*@batch.to_cql_ary)
|
||||
end
|
||||
ensure
|
||||
@batch = nil
|
||||
end
|
||||
end
|
||||
nil
|
||||
end
|
||||
|
||||
def in_batch?
|
||||
!!@batch
|
||||
end
|
||||
|
||||
# update an AR-style record in cassandra
|
||||
# table_name is the cassandra table name
|
||||
# primary_key_attrs is a hash of { key => value } attributes to uniquely identify the record
|
||||
|
@ -74,9 +144,9 @@ module Canvas::Cassandra
|
|||
# { "colname" => newvalue }
|
||||
# { "colname" => [oldvalue, newvalue] }
|
||||
def update_record(table_name, primary_key_attrs, changes)
|
||||
statement, args = self.class.build_update_record_cql(table_name, primary_key_attrs, changes)
|
||||
return unless statement
|
||||
execute(statement, *args)
|
||||
batch do
|
||||
do_update_record(table_name, primary_key_attrs, changes)
|
||||
end
|
||||
end
|
||||
|
||||
def select_value(query, *args)
|
||||
|
@ -90,7 +160,7 @@ module Canvas::Cassandra
|
|||
|
||||
protected
|
||||
|
||||
def self.build_update_record_cql(table_name, primary_key_attrs, changes)
|
||||
def do_update_record(table_name, primary_key_attrs, changes)
|
||||
where_args = []
|
||||
primary_key_attrs = primary_key_attrs.with_indifferent_access
|
||||
changes = changes.with_indifferent_access
|
||||
|
@ -112,31 +182,23 @@ module Canvas::Cassandra
|
|||
# split changes into updates and deletes
|
||||
partition { |key,val| val.nil? }
|
||||
|
||||
args = []
|
||||
|
||||
# inserts and updates in cassandra are equivalent,
|
||||
# so no need to differentiate here
|
||||
if updates.present?
|
||||
args = []
|
||||
update_cql = updates.map { |key,val| args << val; "#{key} = ?" }.join(", ")
|
||||
update_statement = "UPDATE #{table_name} SET #{update_cql} WHERE #{where_clause}"
|
||||
statement = "UPDATE #{table_name} SET #{update_cql} WHERE #{where_clause}"
|
||||
args.concat where_args
|
||||
update(statement, *args)
|
||||
end
|
||||
|
||||
if deletes.present?
|
||||
args = []
|
||||
delete_cql = deletes.map(&:first).join(", ")
|
||||
delete_statement = "DELETE #{delete_cql} FROM #{table_name} WHERE #{where_clause}"
|
||||
statement = "DELETE #{delete_cql} FROM #{table_name} WHERE #{where_clause}"
|
||||
args.concat where_args
|
||||
update(statement, *args)
|
||||
end
|
||||
|
||||
if update_statement && delete_statement
|
||||
# http://www.datastax.com/docs/1.1/references/cql/BATCH
|
||||
# note there's no semicolons between statements in the batch
|
||||
statement = "BEGIN BATCH #{update_statement} #{delete_statement} APPLY BATCH"
|
||||
else
|
||||
statement = update_statement || delete_statement
|
||||
end
|
||||
|
||||
return statement, args
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -16,53 +16,142 @@
|
|||
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
require File.expand_path(File.dirname(__FILE__) + '/../../spec_helper.rb')
|
||||
require File.expand_path(File.dirname(__FILE__) + '/../../cassandra_spec_helper.rb')
|
||||
|
||||
describe "Canvas::Redis::Cassandra" do
|
||||
let(:db) do
|
||||
@cql_db = mock()
|
||||
CassandraCQL::Database.stubs(:new).returns(@cql_db)
|
||||
Canvas::Cassandra::Database.new([], {}, {})
|
||||
end
|
||||
|
||||
describe "#batch" do
|
||||
it "should do nothing for empty batches" do
|
||||
db.expects(:execute).never
|
||||
db.in_batch?.should == false
|
||||
db.batch do
|
||||
db.in_batch?.should == true
|
||||
end
|
||||
db.in_batch?.should == false
|
||||
end
|
||||
|
||||
it "should do update statements in a batch" do
|
||||
db.expects(:execute).with("1")
|
||||
db.batch { db.update("1") }
|
||||
|
||||
db.expects(:execute).with("BEGIN BATCH UPDATE ? ? UPDATE ? ? APPLY BATCH", 1, 2, 3, 4)
|
||||
db.batch { db.update("UPDATE ? ?", 1, 2); db.update("UPDATE ? ?", 3, 4) }
|
||||
end
|
||||
|
||||
it "should not batch up execute statements" do
|
||||
db.expects(:execute).with("SELECT").returns("RETURN")
|
||||
db.expects(:execute).with("BEGIN BATCH 1 2 APPLY BATCH")
|
||||
db.batch do
|
||||
db.update("1")
|
||||
db.execute("SELECT").should == "RETURN"
|
||||
db.update("2")
|
||||
end
|
||||
end
|
||||
|
||||
it "should allow nested batch calls" do
|
||||
db.expects(:execute).with("BEGIN BATCH 1 2 APPLY BATCH")
|
||||
db.batch do
|
||||
db.update("1")
|
||||
db.batch do
|
||||
db.in_batch?.should == true
|
||||
db.update("2")
|
||||
end
|
||||
end
|
||||
db.in_batch?.should == false
|
||||
end
|
||||
|
||||
it "should clean up from exceptions" do
|
||||
db.expects(:execute).never
|
||||
begin
|
||||
db.batch do
|
||||
db.update("1")
|
||||
raise "oh noes"
|
||||
end
|
||||
rescue
|
||||
db.in_batch?.should == false
|
||||
end
|
||||
db.expects(:execute).with("2")
|
||||
db.batch do
|
||||
db.update("2")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#update_record" do
|
||||
it "should do nothing if there are no updates or deletes" do
|
||||
statement, args = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, {})
|
||||
statement.should be_nil
|
||||
db.expects(:execute).never
|
||||
db.update_record("test_table", { :id => 5 }, {})
|
||||
end
|
||||
|
||||
it "should do lone updates" do
|
||||
cql1 = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => "test" })
|
||||
cql2 = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => [nil, "test"] })
|
||||
cql1.should == ["UPDATE test_table SET name = ? WHERE id = ?", ["test", 5]]
|
||||
cql1.should == cql2
|
||||
db.expects(:execute).with("UPDATE test_table SET name = ? WHERE id = ?", "test", 5)
|
||||
db.update_record("test_table", { :id => 5 }, { :name => "test" })
|
||||
db.expects(:execute).with("UPDATE test_table SET name = ? WHERE id = ?", "test", 5)
|
||||
db.update_record("test_table", { :id => 5 }, { :name => [nil, "test"] })
|
||||
end
|
||||
|
||||
it "should do multi-updates" do
|
||||
cql = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => "test", :nick => ["old", "new"] })
|
||||
cql.should == ["UPDATE test_table SET name = ?, nick = ? WHERE id = ?", ["test", "new", 5]]
|
||||
db.expects(:execute).with("UPDATE test_table SET name = ?, nick = ? WHERE id = ?", "test", "new", 5)
|
||||
db.update_record("test_table", { :id => 5 }, { :name => "test", :nick => ["old", "new"] })
|
||||
end
|
||||
|
||||
it "should do lone deletes" do
|
||||
cql1 = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => nil })
|
||||
cql2 = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => ["old", nil] })
|
||||
cql1.should == ["DELETE name FROM test_table WHERE id = ?", [5]]
|
||||
cql1.should == cql2
|
||||
db.expects(:execute).with("DELETE name FROM test_table WHERE id = ?", 5)
|
||||
db.update_record("test_table", { :id => 5 }, { :name => nil })
|
||||
db.expects(:execute).with("DELETE name FROM test_table WHERE id = ?", 5)
|
||||
db.update_record("test_table", { :id => 5 }, { :name => ["old", nil] })
|
||||
end
|
||||
|
||||
it "should do multi-deletes" do
|
||||
cql = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => nil, :nick => ["old", nil] })
|
||||
cql.should == ["DELETE name, nick FROM test_table WHERE id = ?", [5]]
|
||||
db.expects(:execute).with("DELETE name, nick FROM test_table WHERE id = ?", 5)
|
||||
db.update_record("test_table", { :id => 5 }, { :name => nil, :nick => ["old", nil] })
|
||||
end
|
||||
|
||||
it "should do combined updates and deletes" do
|
||||
cql = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => "test", :nick => nil })
|
||||
cql.should == ["BEGIN BATCH UPDATE test_table SET name = ? WHERE id = ? DELETE nick FROM test_table WHERE id = ? APPLY BATCH", ["test", 5, 5]]
|
||||
db.expects(:execute).with("BEGIN BATCH UPDATE test_table SET name = ? WHERE id = ? DELETE nick FROM test_table WHERE id = ? APPLY BATCH", "test", 5, 5)
|
||||
db.update_record("test_table", { :id => 5 }, { :name => "test", :nick => nil })
|
||||
end
|
||||
|
||||
it "should work when already in a batch" do
|
||||
db.expects(:execute).with("BEGIN BATCH UPDATE ? UPDATE test_table SET name = ? WHERE id = ? DELETE nick FROM test_table WHERE id = ? UPDATE ? APPLY BATCH", 1, "test", 5, 5, 2)
|
||||
db.batch do
|
||||
db.update("UPDATE ?", 1)
|
||||
db.update_record("test_table", { :id => 5 }, { :name => "test", :nick => nil })
|
||||
db.update("UPDATE ?", 2)
|
||||
end
|
||||
end
|
||||
|
||||
it "should handle compound primary keys" do
|
||||
cql = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5, :sub_id => "sub!" }, { :name => "test", :id => 5, :sub_id => [nil, "sub!"] })
|
||||
cql.should == ["UPDATE test_table SET name = ? WHERE id = ? AND sub_id = ?", ["test", 5, "sub!"]]
|
||||
db.expects(:execute).with("UPDATE test_table SET name = ? WHERE id = ? AND sub_id = ?", "test", 5, "sub!")
|
||||
db.update_record("test_table", { :id => 5, :sub_id => "sub!" }, { :name => "test", :id => 5, :sub_id => [nil, "sub!"] })
|
||||
end
|
||||
|
||||
it "should disallow changing a primary key component" do
|
||||
expect {
|
||||
Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5, :sub_id => "sub!" }, { :name => "test", :id => 5, :sub_id => ["old", "sub!"]})
|
||||
db.update_record("test_table", { :id => 5, :sub_id => "sub!" }, { :name => "test", :id => 5, :sub_id => ["old", "sub!"]})
|
||||
}.to raise_error(ArgumentError)
|
||||
end
|
||||
end
|
||||
|
||||
describe "execute and update" do
|
||||
it_should_behave_like "cassandra page views"
|
||||
|
||||
let(:db) { PageView.cassandra }
|
||||
|
||||
it "should return the result from execute" do
|
||||
db.execute("select count(*) from page_views").fetch['count'].should == 0
|
||||
db.select_value("select count(*) from page_views").should == 0
|
||||
db.execute("insert into page_views (request_id, user_id) values (?, ?)", "test", 0).should == nil
|
||||
end
|
||||
|
||||
it "should return nothing from update" do
|
||||
db.update("select count(*) from page_views").should == nil
|
||||
db.update("insert into page_views (request_id, user_id) values (?, ?)", "test", 0).should == nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue