support cassandra 1.2+ consistency level usage
This changed from a part of the query string in cassandra 1.1 to a separate parameter passed to execute in cassandra 1.2 and above. Unfortunately we need to jump through some hoops to support both, until we've fully upgraded to 1.2. This commit adds a placeholder to the query string %CONSISTENCY% that will be replaced with the chosen level in 1.1, and replaced with the empty string in 1.2. Once we've upgraded to 1.2, we can remove all this as it'll just be another option to the method. closes CNVS-9273 test plan: Using each of cassandra 1.1, 1.2, and 2.0: * Clear out the consistency level Settings * Setting.connection.delete("DELETE FROM settings WHERE name LIKE 'event_stream.%_consistency%'") * Verify that page views and audit logs can be fetched, verify in the rails logs that no consistency level is given in the CQL query lines * Set the consistency level * Setting.set('event_stream.read_consistency', 'ONE') * Verify that page views and audit logs can be fetched, verify in the rails logs that consistency level "ONE" is given in the CQL query lines. In 1.1, this will be in the query string, in 1.2 and 2.0 it will follow the query in an options hash Change-Id: I3d007376d096e6ed31a40e699e77dca4cdd065a2 Reviewed-on: https://gerrit.instructure.com/35171 Tested-by: Brian Palmer <brianp@instructure.com> Reviewed-by: Jacob Fugal <jacob@instructure.com> QA-Review: August Thornton <august@instructure.com> Product-Review: Brian Palmer <brianp@instructure.com>
This commit is contained in:
parent
1f33ce1038
commit
6f7e610124
|
@ -1,5 +1,5 @@
|
|||
group :cassandra do
|
||||
gem 'cassandra-cql', '1.2.1', :github => 'kreynolds/cassandra-cql', :ref => 'd100be075b04153cf4116da7512892a1e8c0a7e4' #dependency of canvas_cassandra
|
||||
gem 'cassandra-cql', '1.2.2', :github => 'kreynolds/cassandra-cql', :ref => 'beed72e249d02cebc850a4b92b468c8b64b12257' #dependency of canvas_cassandra
|
||||
gem 'simple_uuid', '0.4.0'
|
||||
gem 'thrift', '0.8.0'
|
||||
gem 'thrift_client', '0.8.4'
|
||||
|
|
|
@ -4,4 +4,4 @@ gem 'simplecov-rcov', '0.2.3', :require => false
|
|||
# Specify your gem's dependencies in canvas_cassandra.gemspec
|
||||
gemspec
|
||||
|
||||
gem 'cassandra-cql', '1.2.1', :github => 'kreynolds/cassandra-cql', :ref => 'd100be075b04153cf4116da7512892a1e8c0a7e4'
|
||||
gem 'cassandra-cql', '1.2.2', :github => 'kreynolds/cassandra-cql', :ref => 'beed72e249d02cebc850a4b92b468c8b64b12257'
|
||||
|
|
|
@ -16,7 +16,7 @@ Gem::Specification.new do |spec|
|
|||
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
|
||||
spec.require_paths = ["lib"]
|
||||
|
||||
spec.add_dependency 'cassandra-cql', '1.2.1'
|
||||
spec.add_dependency 'cassandra-cql', '1.2.2'
|
||||
|
||||
spec.add_development_dependency "bundler", "~> 1.5"
|
||||
spec.add_development_dependency "rake"
|
||||
|
|
|
@ -3,4 +3,8 @@ require "benchmark"
|
|||
|
||||
module CanvasCassandra
|
||||
require "canvas_cassandra/database"
|
||||
|
||||
def self.consistency_level(name)
|
||||
CassandraCQL::Thrift::ConsistencyLevel.const_get(name.to_s.upcase)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
module CanvasCassandra
|
||||
|
||||
class Database
|
||||
CONSISTENCY_CLAUSE = %r{%CONSISTENCY% ?}
|
||||
|
||||
def initialize(fingerprint, servers, opts, logger)
|
||||
thrift_opts = {}
|
||||
thrift_opts[:retries] = opts.delete(:retries) if opts.has_key?(:retries)
|
||||
|
@ -38,11 +40,26 @@ module CanvasCassandra
|
|||
# or arel is flexible enough for this, rather than rolling our own.
|
||||
def execute(query, *args)
|
||||
result = nil
|
||||
opts = (args.last.is_a?(Hash) && args.pop) || {}
|
||||
|
||||
ms = 1000 * Benchmark.realtime do
|
||||
result = @db.execute(query, *args)
|
||||
consistency_text = opts[:consistency]
|
||||
consistency = CanvasCassandra.consistency_level(consistency_text) if consistency_text
|
||||
|
||||
if @db.use_cql3? || !consistency
|
||||
query = query.sub(CONSISTENCY_CLAUSE, '')
|
||||
elsif !@db.use_cql3?
|
||||
query = query.sub(CONSISTENCY_CLAUSE, "USING CONSISTENCY #{consistency_text} ")
|
||||
end
|
||||
|
||||
if @db.use_cql3? && consistency
|
||||
result = @db.execute_with_consistency(query, consistency, *args)
|
||||
else
|
||||
result = @db.execute(query, *args)
|
||||
end
|
||||
end
|
||||
|
||||
@logger.debug(" #{"CQL (%.2fms)" % [ms]} #{sanitize(query, args)} [#{fingerprint}]")
|
||||
@logger.debug(" #{"CQL (%.2fms)" % [ms]} #{sanitize(query, args)} #{opts.inspect} [#{fingerprint}]")
|
||||
result
|
||||
end
|
||||
|
||||
|
|
|
@ -19,13 +19,56 @@
|
|||
require "spec_helper"
|
||||
|
||||
describe CanvasCassandra do
|
||||
let(:conn) { double() }
|
||||
|
||||
let(:db) do
|
||||
CanvasCassandra::Database.allocate.tap do |db|
|
||||
db.send(:instance_variable_set, :@db, double())
|
||||
db.send(:instance_variable_set, :@db, conn)
|
||||
db.send(:instance_variable_set, :@logger, double().as_null_object)
|
||||
db.stub(:sanitize).and_return("")
|
||||
end
|
||||
end
|
||||
|
||||
describe "#execute" do
|
||||
# I'm using %CONSISTENCY% as a query parameter here to make sure that the
|
||||
# execute code doesn't accidentally replace the string in those params
|
||||
def run_query(consistency)
|
||||
db.execute("SELECT a %CONSISTENCY% WHERE a = ?", "%CONSISTENCY%", consistency: consistency)
|
||||
end
|
||||
|
||||
describe "cql3" do
|
||||
before do
|
||||
conn.stub(:use_cql3?).and_return(true)
|
||||
end
|
||||
|
||||
it "passes the consistency level as a param" do
|
||||
expect(conn).to receive(:execute_with_consistency).with("SELECT a WHERE a = ?", CassandraCQL::Thrift::ConsistencyLevel::ONE, "%CONSISTENCY%")
|
||||
run_query('ONE')
|
||||
end
|
||||
|
||||
it "ignores a nil consistency" do
|
||||
expect(conn).to receive(:execute).with("SELECT a WHERE a = ?", "%CONSISTENCY%")
|
||||
run_query(nil)
|
||||
end
|
||||
end
|
||||
|
||||
describe "cql2" do
|
||||
before do
|
||||
conn.stub(:use_cql3?).and_return(false)
|
||||
end
|
||||
|
||||
it "passes the consistency level in the query string" do
|
||||
expect(conn).to receive(:execute).with("SELECT a USING CONSISTENCY ONE WHERE a = ?", "%CONSISTENCY%")
|
||||
run_query('ONE')
|
||||
end
|
||||
|
||||
it "ignores a nil consistency" do
|
||||
expect(conn).to receive(:execute).with("SELECT a WHERE a = ?", "%CONSISTENCY%")
|
||||
run_query(nil)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#batch" do
|
||||
it "does nothing for empty batches" do
|
||||
expect(db).to_not receive(:execute)
|
||||
|
@ -214,4 +257,9 @@ describe CanvasCassandra do
|
|||
expect(db.name).to eq keyspace_name
|
||||
end
|
||||
end
|
||||
|
||||
it "should map consistency level names to values" do
|
||||
expect(CanvasCassandra.consistency_level("LOCAL_QUORUM")).to eq CassandraCQL::Thrift::ConsistencyLevel::LOCAL_QUORUM
|
||||
expect { CanvasCassandra.consistency_level("XXX") }.to raise_error(NameError)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -89,7 +89,7 @@ class EventStream::Index
|
|||
end
|
||||
|
||||
def select_cql
|
||||
"SELECT ordered_id, #{id_column} FROM #{table} #{event_stream.read_consistency_clause}WHERE #{key_column} = ?"
|
||||
"SELECT ordered_id, #{id_column} FROM #{table} %CONSISTENCY% WHERE #{key_column} = ?"
|
||||
end
|
||||
|
||||
def insert_cql
|
||||
|
@ -145,7 +145,7 @@ class EventStream::Index
|
|||
|
||||
# execute the query collecting the results. set the bookmark iff there
|
||||
# was a result after the full page
|
||||
database.execute(qs, *args).fetch do |row|
|
||||
database.execute(qs, *args, consistency: event_stream.read_consistency_level).fetch do |row|
|
||||
if pager.size == pager.per_page
|
||||
pager.has_more!
|
||||
else
|
||||
|
|
|
@ -68,7 +68,7 @@ class EventStream::Stream
|
|||
def fetch(ids)
|
||||
rows = []
|
||||
if available? && ids.present?
|
||||
database.execute(fetch_cql, ids).fetch do |row|
|
||||
database.execute(fetch_cql, ids, consistency: read_consistency_level).fetch do |row|
|
||||
rows << record_type.from_attributes(row.to_hash)
|
||||
end
|
||||
end
|
||||
|
@ -115,13 +115,7 @@ class EventStream::Stream
|
|||
end
|
||||
|
||||
def fetch_cql
|
||||
"SELECT * FROM #{table} #{read_consistency_clause} WHERE #{id_column} IN (?)"
|
||||
end
|
||||
|
||||
def read_consistency_clause
|
||||
unless read_consistency_level.nil? || read_consistency_level.empty?
|
||||
"USING CONSISTENCY #{read_consistency_level}"
|
||||
end
|
||||
"SELECT * FROM #{table} %CONSISTENCY% WHERE #{id_column} IN (?)"
|
||||
end
|
||||
|
||||
private
|
||||
|
|
|
@ -42,7 +42,7 @@ describe EventStream::Index do
|
|||
:database => @database,
|
||||
:record_type => EventStream::Record,
|
||||
:ttl_seconds => 1.year,
|
||||
:read_consistency_clause => nil)
|
||||
:read_consistency_level => nil)
|
||||
end
|
||||
|
||||
context "setup block" do
|
||||
|
@ -247,23 +247,38 @@ describe EventStream::Index do
|
|||
|
||||
allow(@raw_results).to stub_with_multiple_yields
|
||||
|
||||
expect(@database).to receive(:execute).once.and_return(@raw_results)
|
||||
expect(@stream).to receive(:fetch).once.with(@ids[start, requested]).and_return(@typed_results[start, requested])
|
||||
end
|
||||
|
||||
def setup_execute(start, requested)
|
||||
setup_fetch(start, requested)
|
||||
expect(@database).to receive(:execute).once.and_return(@raw_results)
|
||||
end
|
||||
|
||||
it "return a bookmarked collection" do
|
||||
expect(@pager).to be_a BookmarkedCollection::Proxy
|
||||
end
|
||||
|
||||
it "is able to get bookmark from a typed item" do
|
||||
setup_fetch(0, 2)
|
||||
setup_execute(0, 2)
|
||||
page = @pager.paginate(:per_page => 2)
|
||||
expect(page.bookmark_for(page.last)).to eq page.next_bookmark
|
||||
end
|
||||
|
||||
it "uses the configured read_consistency_level" do
|
||||
setup_fetch(0, 2)
|
||||
expect(@database).to receive(:execute).with(/%CONSISTENCY% WHERE/, anything, anything, consistency: nil).and_return(@raw_results)
|
||||
page = @pager.paginate(:per_page => 2)
|
||||
|
||||
setup_fetch(0, 2)
|
||||
@stream.stub(:read_consistency_level).and_return('ALL')
|
||||
expect(@database).to receive(:execute).with(/%CONSISTENCY% WHERE/, anything, anything, consistency: 'ALL').and_return(@raw_results)
|
||||
page = @pager.paginate(:per_page => 2)
|
||||
end
|
||||
|
||||
context "one page of results" do
|
||||
before do
|
||||
setup_fetch(0, 4)
|
||||
setup_execute(0, 4)
|
||||
@page = @pager.paginate(:per_page => 4)
|
||||
end
|
||||
|
||||
|
@ -278,7 +293,7 @@ describe EventStream::Index do
|
|||
|
||||
context "first of multiple pages of results" do
|
||||
before do
|
||||
setup_fetch(0, 2)
|
||||
setup_execute(0, 2)
|
||||
@page = @pager.paginate(:per_page => 2)
|
||||
end
|
||||
|
||||
|
@ -293,10 +308,10 @@ describe EventStream::Index do
|
|||
|
||||
context "last of multiple pages of results" do
|
||||
before do
|
||||
setup_fetch(0, 2)
|
||||
setup_execute(0, 2)
|
||||
page = @pager.paginate(:per_page => 2)
|
||||
|
||||
setup_fetch(2, 2)
|
||||
setup_execute(2, 2)
|
||||
@page = @pager.paginate(:per_page => 2, :page => page.next_page)
|
||||
end
|
||||
|
||||
|
@ -323,14 +338,14 @@ describe EventStream::Index do
|
|||
@index.scrollback_limit Time.now - @newest
|
||||
bucket = @index.bucket_for_time(@newest)
|
||||
expect(@database).to receive(:execute).once.
|
||||
with(/WHERE #{@index.key_column} = \?/, "key/#{bucket}", anything, anything).
|
||||
with(/WHERE #{@index.key_column} = \?/, "key/#{bucket}", anything, anything, anything).
|
||||
and_return(@query)
|
||||
@pager.paginate(:per_page => 1)
|
||||
end
|
||||
|
||||
it "skips results newer than newest in starting bucket" do
|
||||
expect(@database).to receive(:execute).once.
|
||||
with(/AND ordered_id < \?/, anything, anything, "#{@newest.to_i + 1}/").
|
||||
with(/AND ordered_id < \?/, anything, anything, "#{@newest.to_i + 1}/", anything).
|
||||
and_return(@query)
|
||||
@pager.paginate(:per_page => 1)
|
||||
end
|
||||
|
@ -341,7 +356,7 @@ describe EventStream::Index do
|
|||
page, bookmark = page.next_page, page.next_bookmark
|
||||
|
||||
expect(@database).to receive(:execute).once.
|
||||
with(/AND ordered_id < \?/, anything, anything, bookmark[1]).
|
||||
with(/AND ordered_id < \?/, anything, anything, bookmark[1], anything).
|
||||
and_return(@query)
|
||||
@pager.paginate(:per_page => 1, :page => page)
|
||||
end
|
||||
|
@ -362,14 +377,14 @@ describe EventStream::Index do
|
|||
@index.scrollback_limit 1.day
|
||||
bucket = @index.bucket_for_time(@oldest)
|
||||
expect(@database).to receive(:execute).once.
|
||||
with(/WHERE #{@index.key_column} = \?/, "key/#{bucket}", anything).
|
||||
with(/WHERE #{@index.key_column} = \?/, "key/#{bucket}", anything, anything).
|
||||
and_return(@query)
|
||||
@pager.paginate(:per_page => 1)
|
||||
end
|
||||
|
||||
it "skips results older than oldest in any bucket" do
|
||||
expect(@database).to receive(:execute).once.
|
||||
with(/AND ordered_id >= \?/, anything, "#{@oldest.to_i}/").
|
||||
with(/AND ordered_id >= \?/, anything, "#{@oldest.to_i}/", anything).
|
||||
and_return(@query)
|
||||
@pager.paginate(:per_page => 1)
|
||||
end
|
||||
|
@ -383,7 +398,7 @@ describe EventStream::Index do
|
|||
bucket = @index.bucket_for_time(scrollback_limit)
|
||||
|
||||
expect(@database).to receive(:execute).once.
|
||||
with(/WHERE #{@index.key_column} = \?/, "key/#{bucket}", anything).
|
||||
with(/WHERE #{@index.key_column} = \?/, "key/#{bucket}", anything, anything).
|
||||
and_return(@query)
|
||||
@pager.paginate(:per_page => 1)
|
||||
end
|
||||
|
|
|
@ -55,7 +55,7 @@ describe EventStream::Stream do
|
|||
expect(stream.table).to eq table.to_s
|
||||
expect(stream.id_column).to eq id_column.to_s
|
||||
expect(stream.record_type).to eq record_type
|
||||
expect(stream.read_consistency_clause).to eq 'USING CONSISTENCY ALL'
|
||||
expect(stream.read_consistency_level).to eq 'ALL'
|
||||
end
|
||||
|
||||
it "requires database_name and table" do
|
||||
|
@ -162,18 +162,6 @@ describe EventStream::Stream do
|
|||
end
|
||||
end
|
||||
|
||||
describe "read_consistency_clause" do
|
||||
it "returns clause if read_consistency_level is set" do
|
||||
expect(@stream.read_consistency_clause).to be_nil
|
||||
|
||||
@stream.read_consistency_level ''
|
||||
expect(@stream.read_consistency_clause).to be_nil
|
||||
|
||||
@stream.read_consistency_level 'ALL'
|
||||
expect(@stream.read_consistency_clause).to eq 'USING CONSISTENCY ALL'
|
||||
end
|
||||
end
|
||||
|
||||
describe "on_insert" do
|
||||
before do
|
||||
@record = double(:id => double('id'), :created_at => Time.now, :attributes => double('attributes'))
|
||||
|
@ -329,20 +317,20 @@ describe EventStream::Stream do
|
|||
end
|
||||
|
||||
it "uses the configured table" do
|
||||
expect(database).to receive(:execute).once.with(/ FROM #{@table} /, anything).and_return(@results)
|
||||
expect(database).to receive(:execute).once.with(/ FROM #{@table} /, anything, anything).and_return(@results)
|
||||
@stream.fetch([1])
|
||||
end
|
||||
|
||||
it "uses the configured id column" do
|
||||
id_column = double(:to_s => "expected_id_column")
|
||||
@stream.id_column id_column
|
||||
expect(database).to receive(:execute).once.with(/ WHERE #{id_column}/, anything).and_return(@results)
|
||||
expect(database).to receive(:execute).once.with(/ WHERE #{id_column}/, anything, anything).and_return(@results)
|
||||
@stream.fetch([1])
|
||||
end
|
||||
|
||||
it "passes the given ids to the execute" do
|
||||
ids = double('ids', :empty? => false)
|
||||
expect(database).to receive(:execute).once.with(anything, ids).and_return(@results)
|
||||
expect(database).to receive(:execute).once.with(anything, ids, anything).and_return(@results)
|
||||
@stream.fetch(ids)
|
||||
end
|
||||
|
||||
|
@ -366,11 +354,11 @@ describe EventStream::Stream do
|
|||
end
|
||||
|
||||
it "uses the configured consistency level" do
|
||||
expect(database).to receive(:execute).once.with(/^[USING CONSISTENCY ALL]/, anything).and_return(@results)
|
||||
expect(database).to receive(:execute).once.with(/%CONSISTENCY% WHERE/, anything, consistency: nil).and_return(@results)
|
||||
@stream.fetch([1])
|
||||
|
||||
@stream.read_consistency_level 'ALL'
|
||||
expect(database).to receive(:execute).once.with(/USING CONSISTENCY ALL/, anything).and_return(@results)
|
||||
expect(database).to receive(:execute).once.with(/%CONSISTENCY% WHERE/, anything, consistency: "ALL").and_return(@results)
|
||||
@stream.fetch([1])
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue