add time_to_live config to EventStream

and plumb it through Canvas::Cassandra::Database's *_record methods

refs CNVS-390

Change-Id: I47a19a7040021d58c242f9cb483dd6f029298d84
Reviewed-on: https://gerrit.instructure.com/22155
Tested-by: Jenkins <jenkins@instructure.com>
Reviewed-by: Brian Palmer <brianp@instructure.com>
QA-Review: Jeremy Putnam <jeremyp@instructure.com>
Product-Review: Jacob Fugal <jacob@instructure.com>
This commit is contained in:
Jacob Fugal 2013-07-09 14:21:18 -06:00
parent 65b1e19244
commit a91e6c32b5
5 changed files with 57 additions and 25 deletions

View File

@ -150,18 +150,18 @@ module Canvas::Cassandra
# in other words, changes is a hash in either of these formats (mixing is ok):
# { "colname" => newvalue }
# { "colname" => [oldvalue, newvalue] }
def update_record(table_name, primary_key_attrs, changes)
def update_record(table_name, primary_key_attrs, changes, ttl_seconds=nil)
batch do
do_update_record(table_name, primary_key_attrs, changes)
do_update_record(table_name, primary_key_attrs, changes, ttl_seconds)
end
end
# same as update_record, but preferred when doing inserts -- it skips
# updating columns with nil values, rather than creating tombstone delete
# records for them
def insert_record(table_name, primary_key_attrs, changes)
def insert_record(table_name, primary_key_attrs, changes, ttl_seconds=nil)
changes = changes.reject { |k,v| v.is_a?(Array) ? v.last.nil? : v.nil? }
update_record(table_name, primary_key_attrs, changes)
update_record(table_name, primary_key_attrs, changes, ttl_seconds)
end
def select_value(query, *args)
@ -185,7 +185,7 @@ module Canvas::Cassandra
protected
def do_update_record(table_name, primary_key_attrs, changes)
def do_update_record(table_name, primary_key_attrs, changes, ttl_seconds)
primary_key_attrs = primary_key_attrs.with_indifferent_access
changes = changes.with_indifferent_access
where_clause, where_args = build_where_conditions(primary_key_attrs)
@ -210,8 +210,13 @@ module Canvas::Cassandra
# so no need to differentiate here
if updates.present?
args = []
statement = "UPDATE #{table_name}"
if ttl_seconds
args << ttl_seconds
statement << " USING TTL ?"
end
update_cql = updates.map { |key,val| args << val; "#{key} = ?" }.join(", ")
statement = "UPDATE #{table_name} SET #{update_cql} WHERE #{where_clause}"
statement << " SET #{update_cql} WHERE #{where_clause}"
args.concat where_args
update(statement, *args)
end

View File

@ -23,6 +23,7 @@ class EventStream
attr_config :table, :type => String
attr_config :id_column, :type => String, :default => 'id'
attr_config :record_type, :default => EventStream::Record
attr_config :time_to_live, :type => Fixnum, :default => 1.year
def initialize(&blk)
instance_exec(&blk) if blk
@ -89,6 +90,10 @@ class EventStream
"#{database_name}.#{table}"
end
def ttl_seconds(created_at)
((created_at + time_to_live) - Time.now).to_i
end
private
def fetch_cql
@ -101,8 +106,11 @@ class EventStream
end
def execute(operation, record)
ttl_seconds = self.ttl_seconds(record.created_at)
return if ttl_seconds < 0
database.batch do
database.send(:"#{operation}_record", table, { id_column => record.id }, operation_payload(operation, record))
database.send(:"#{operation}_record", table, { id_column => record.id }, operation_payload(operation, record), ttl_seconds)
run_callbacks(operation, record)
end
rescue Exception => exception

View File

@ -53,11 +53,14 @@ class EventStream::Index
end
def insert(id, key, timestamp)
ttl_seconds = event_stream.ttl_seconds(timestamp)
return if ttl_seconds < 0
prefix = id.to_s[0,8]
bucket = bucket_for_time(timestamp)
key = "#{key}/#{bucket}"
ordered_id = "#{timestamp.to_i}/#{prefix}"
database.update(insert_cql, key, ordered_id, id)
database.update(insert_cql, key, ordered_id, id, ttl_seconds)
end
def for_key(key)
@ -80,7 +83,7 @@ class EventStream::Index
private
def insert_cql
"INSERT INTO #{table} (#{key_column}, ordered_id, #{id_column}) VALUES (?, ?, ?)"
"INSERT INTO #{table} (#{key_column}, ordered_id, #{id_column}) VALUES (?, ?, ?) USING TTL ?"
end
def history(key, pager)

View File

@ -25,7 +25,7 @@ describe EventStream::Index do
def @database.update_record(*args); end
def @database.update(*args); end
@stream = stub('stream', :database => @database)
@stream = stub('stream', :database => @database, :ttl_seconds => 1.year)
end
context "setup block" do
@ -181,7 +181,7 @@ describe EventStream::Index do
end
it "should use the configured table" do
@database.expects(:update).once.with(regexp_matches(/ INTO #{@table} /), anything, anything, anything)
@database.expects(:update).once.with(regexp_matches(/ INTO #{@table} /), anything, anything, anything, anything)
@index.insert(@id, @key, @timestamp)
end
@ -190,20 +190,25 @@ describe EventStream::Index do
key_column = stub(:to_s => "expected_key_column")
@index.key_column key_column
@index.expects(:bucket_for_time).once.with(@timestamp).returns(bucket)
@database.expects(:update).once.with(regexp_matches(/\(#{key_column}, /), "#{@key}/#{bucket}", anything, anything)
@database.expects(:update).once.with(regexp_matches(/\(#{key_column}, /), "#{@key}/#{bucket}", anything, anything, anything)
@index.insert(@id, @key, @timestamp)
end
it "should take a prefix off the id and the bucket for the ordered_id" do
prefix = @id.to_s[0,8]
@database.expects(:update).once.with(regexp_matches(/, ordered_id,/), anything, "#{@timestamp.to_i}/#{prefix}", anything)
@database.expects(:update).once.with(regexp_matches(/, ordered_id,/), anything, "#{@timestamp.to_i}/#{prefix}", anything, anything)
@index.insert(@id, @key, @timestamp)
end
it "should pass through the id into the configured id column" do
id_column = stub(:to_s => "expected_id_column")
@index.id_column id_column
@database.expects(:update).once.with(regexp_matches(/, #{id_column}\)/), anything, anything, @id)
@database.expects(:update).once.with(regexp_matches(/, #{id_column}\)/), anything, anything, @id, anything)
@index.insert(@id, @key, @timestamp)
end
it "should include the ttl" do
@database.expects(:update).once.with(regexp_matches(/ USING TTL /), anything, anything, anything, @stream.ttl_seconds(@timestamp))
@index.insert(@id, @key, @timestamp)
end
end

View File

@ -97,7 +97,7 @@ describe EventStream do
describe "on_insert" do
before do
@record = stub(:id => stub('id'), :attributes => stub('attributes'))
@record = stub(:id => stub('id'), :created_at => Time.now, :attributes => stub('attributes'))
end
it "should register callback for execution during insert" do
@ -126,7 +126,7 @@ describe EventStream do
describe "insert" do
before do
@record = stub(:id => stub('id'), :attributes => stub('attributes'))
@record = stub(:id => stub('id'), :created_at => Time.now, :attributes => stub('attributes'))
end
it "should insert into the configured database" do
@ -135,19 +135,24 @@ describe EventStream do
end
it "should insert into the configured table" do
@database.expects(:insert_record).with(@table.to_s, anything, anything)
@database.expects(:insert_record).with(@table.to_s, anything, anything, anything)
@stream.insert(@record)
end
it "should insert by the record's id into the configured id column" do
id_column = stub(:to_s => stub('id_column'))
@stream.id_column id_column
@database.expects(:insert_record).with(anything, { id_column.to_s => @record.id }, anything)
@database.expects(:insert_record).with(anything, { id_column.to_s => @record.id }, anything, anything)
@stream.insert(@record)
end
it "should insert the record's attributes" do
@database.expects(:insert_record).with(anything, anything, @record.attributes)
@database.expects(:insert_record).with(anything, anything, @record.attributes, anything)
@stream.insert(@record)
end
it "should set the record's ttl" do
@database.expects(:insert_record).with(anything, anything, anything, @stream.ttl_seconds(@record.created_at))
@stream.insert(@record)
end
@ -163,7 +168,7 @@ describe EventStream do
describe "on_update" do
before do
@record = stub(:id => stub('id'), :changes => stub('changes'))
@record = stub(:id => stub('id'), :created_at => Time.now, :changes => stub('changes'))
end
it "should register callback for execution during update" do
@ -192,7 +197,7 @@ describe EventStream do
describe "update" do
before do
@record = stub(:id => stub('id'), :changes => stub('changes'))
@record = stub(:id => stub('id'), :created_at => Time.now, :changes => stub('changes'))
end
it "should update in the configured database" do
@ -201,19 +206,24 @@ describe EventStream do
end
it "should update in the configured table" do
@database.expects(:update_record).with(@table.to_s, anything, anything)
@database.expects(:update_record).with(@table.to_s, anything, anything, anything)
@stream.update(@record)
end
it "should update by the record's id in the configured id column" do
id_column = stub(:to_s => stub('id_column'))
@stream.id_column id_column
@database.expects(:update_record).with(anything, { id_column.to_s => @record.id }, anything)
@database.expects(:update_record).with(anything, { id_column.to_s => @record.id }, anything, anything)
@stream.update(@record)
end
it "should update the record's changes" do
@database.expects(:update_record).with(anything, anything, @record.changes)
@database.expects(:update_record).with(anything, anything, @record.changes, anything)
@stream.update(@record)
end
it "should set the record's ttl" do
@database.expects(:update_record).with(anything, anything, anything, @stream.ttl_seconds(@record.created_at))
@stream.update(@record)
end
@ -291,7 +301,7 @@ describe EventStream do
before do
@record = stub(
:id => stub('id'),
:created_at => stub('created_at', :to_i => 0),
:created_at => Time.now,
:attributes => stub('attributes'),
:changes => stub('changes'),
:entry => @entry
@ -341,6 +351,7 @@ describe EventStream do
@stream.stubs(:database).returns(@database)
@record = stub(
:id => 'id',
:created_at => Time.now,
:attributes => {'attribute' => 'attribute_value'},
:changes => {'changed_attribute' => 'changed_value'})
@exception = Exception.new