cassandra store for page views

Adds a new back-end store for page_views, using a Cassandra cluster. All
the current page view queries are supported, many using denormalized
views on the data.

test plan:

first, canvas instances that are currently using AR page views
should function as before.

by Setting.set('enable_page_views', 'cassandra') and restarting, you will
switch to cassandra page views. a script to migrate the AR page views to
Cassandra is coming. all page view functionality should work as before.
note that the format of the pagination headers in the
/api/v1/users/X/page_views endpoint has changed.

Change-Id: I2d1feb4d83b06a0c852e49508e85e8dce87507b4
Reviewed-on: https://gerrit.instructure.com/14258
Reviewed-by: Jacob Fugal <jacob@instructure.com>
Tested-by: Jenkins <jenkins@instructure.com>
This commit is contained in:
Brian Palmer 2012-09-24 14:05:43 -06:00
parent 90b75dad5d
commit 005e42a757
21 changed files with 697 additions and 194 deletions

View File

@ -122,6 +122,10 @@ group :redis do
gem 'redis', '3.0.1'
end
group :cassandra do
gem 'cassandra-cql', '1.1.1'
end
group :embedly do
gem 'embedly', '1.5.5'
end

View File

@ -780,7 +780,7 @@ class ApplicationController < ActionController::Base
@access.summarized_at = nil
@access.last_access = Time.now.utc
@access.save
@page_view.asset_user_access_id = @access.id if @page_view
@page_view.asset_user_access = @access if @page_view
@page_view_update = true
end
if @page_view && !request.xhr? && request.get? && (response.content_type || "").match(/html/)
@ -797,7 +797,7 @@ class ApplicationController < ActionController::Base
end
rescue => e
logger.error "Pageview error!"
raise e if Rails.env == 'development'
raise e if Rails.env.development?
true
end

View File

@ -20,6 +20,8 @@
class PageViewsController < ApplicationController
before_filter :require_user, :only => [:index]
include Api::V1::PageView
def update
render :json => {:ok => true}
# page view update happens in log_page_view after_filter
@ -41,30 +43,19 @@ class PageViewsController < ApplicationController
if authorized_action(@user, @current_user, :view_statistics)
@page_views = Api.paginate(@user.page_views, self, api_v1_user_page_views_url(:user_id => @user), :order => 'created_at DESC', :without_count => :true)
respond_to do |format|
format.html do
if params[:html_xhr]
render :partial => @page_views
end
end
format.js { render :partial => @page_views }
format.json do
if api_request?
stream_json_array(@page_views,
:include_root => false,
:only => (PageView.content_columns.map(&:name) + ['request_id']))
else
render :partial => @page_views
end
render :json => @page_views.map { |pv| page_view_json(pv, @current_user, session) }
end
format.csv {
format.csv do
cancel_cache_buster
data = @user.page_views.paginate(:page => 1, :per_page => Setting.get('page_views_csv_export_rows', '300').to_i)
send_data(
@user.page_views.by_created_at.scoped(:limit=>params[:report_count] || 300).to_a.to_csv,
:type => "text/csv",
:filename => t(:download_filename, "Pageviews For %{user}", :user => @user.name.to_s.gsub(/ /, "_")) + '.csv',
data.to_a.to_csv,
:type => "text/csv",
:filename => t(:download_filename, "Pageviews For %{user}", :user => @user.name.to_s.gsub(/ /, "_")) + '.csv',
:disposition => "attachment"
)
}
)
end
end
end
end

View File

@ -2341,23 +2341,6 @@ class Course < ActiveRecord::Base
def self.serialization_excludes; [:uuid]; end
def page_views_by_day(options={})
conditions = {
:context_id => self.id,
:context_type => self.class.to_s
}
if options[:dates]
conditions.merge!({
:created_at => (options[:dates].first)..(options[:dates].last)
})
end
PageView.count(
:group => "date(created_at)",
:conditions => conditions
)
end
memoize :page_views_by_day
def section_visibilities_for(user)
Rails.cache.fetch(['section_visibilities_for', user, self].cache_key) do
Enrollment.find(:all, :select => "course_section_id, limit_privileges_to_course_section, type, associated_user_id", :conditions => ['user_id = ? AND course_id = ? AND workflow_state != ?', user.id, self.id, 'deleted']).map{|e| {:course_section_id => e.course_section_id, :limit_privileges_to_course_section => e.limit_privileges_to_course_section, :type => e.type, :associated_user_id => e.associated_user_id, :admin => e.admin?} }
@ -2471,19 +2454,6 @@ class Course < ActiveRecord::Base
end
end
def page_view_data(options={})
# if they dont supply a date range then use the first day returned by page_views_by_day (which should be the first day that there is pageview statistics gathered)
dates = options[:dates].nil? ? [page_views_by_day.sort.first.first.to_datetime, Time.now] : options[:dates]
days = []
dates.first.to_datetime.upto(dates.last) do |d|
# this * 1000 part is because the Highcharts expects something like what Date.UTC(2006, 2, 28) would give you,
# which is MILLISECONDS from the unix epoch, ruby's to_f gives you SECONDS since then.
days << [ (d.at_beginning_of_day.to_f * 1000).to_i , page_views_by_day[d.to_date.to_s].to_i ]
end
days
end
memoize :page_view_data
def unpublished?
self.created? || self.claimed?
end

View File

@ -1,5 +1,5 @@
#
# Copyright (C) 2011 Instructure, Inc.
# Copyright (C) 2012 Instructure, Inc.
#
# This file is part of Canvas.
#
@ -17,8 +17,6 @@
#
class PageView < ActiveRecord::Base
include TextHelper
set_primary_key 'request_id'
belongs_to :developer_key
@ -31,83 +29,88 @@ class PageView < ActiveRecord::Base
before_save :cap_interaction_seconds
belongs_to :context, :polymorphic => true
named_scope :of_account, lambda { |account|
{
:conditions => { :account_id => account.self_and_all_sub_accounts }
}
}
named_scope :by_created_at, :order => 'created_at DESC'
attr_accessor :generated_by_hand
attr_accessor :is_update
attr_accessible :url, :user, :controller, :action, :session_id, :developer_key, :user_agent, :real_user
attr_accessible :url, :user, :controller, :action, :session_id, :developer_key, :user_agent, :real_user, :context
def ensure_account
self.account_id ||= (self.context_type == 'Account' ? self.context_id : self.context.account_id) rescue nil
self.account_id ||= (self.context.is_a?(Account) ? self.context : self.context.account) if self.context
end
def interaction_seconds_readable
seconds = (self.interaction_seconds || 0).to_i
if seconds <= 10
t(:insignificant_duration, "--")
else
readable_duration(seconds)
end
end
def cap_interaction_seconds
self.interaction_seconds = [self.interaction_seconds || 5, 10.minutes.to_i].min
end
def user_name
self.user.name rescue t(:default_user_name, "Unknown User")
end
def context_name
self.context.name rescue ""
end
named_scope :after, lambda{ |date|
{:conditions => ['page_views.created_at > ?', date] }
}
named_scope :for_user, lambda {|user_ids|
{:conditions => {:user_id => user_ids} }
}
named_scope :limit, lambda {|limit|
{:limit => limit }
}
# the list of columns we display to users, export to csv, etc
EXPORTED_COLUMNS = %w(request_id user_id url context_id context_type asset_id asset_type controller action contributed interaction_seconds created_at user_request render_time user_agent participated account_id real_user_id)
def self.page_views_enabled?
!!page_view_method
end
def self.page_view_method
enable_page_views = Setting.get_cached('enable_page_views', 'false')
enable_page_views = Shard.current.settings[:page_view_method] || Setting.get_cached('enable_page_views', 'false')
return false if enable_page_views == 'false'
enable_page_views = 'db' if enable_page_views == 'true' # backwards compat
enable_page_views.to_sym
end
def self.redis_queue?
%w(cache cassandra).include?(page_view_method.to_s)
end
def self.cassandra?
%w(cassandra).include?(page_view_method.to_s)
end
def self.cassandra
@cassandra ||= Canvas::Cassandra::Database.from_config('page_views')
end
def cassandra
self.class.cassandra
end
def self.find_one(id, options)
return super unless cassandra?
find_some([id], options).first || raise(ActiveRecord::RecordNotFound, "Couldn't find PageView with ID=#{id}")
end
def self.find_some(ids, options)
return super unless cassandra?
raise(NotImplementedError, "options not implemented: #{options.inspect}") if options.present?
pvs = []
cassandra.execute("SELECT * FROM page_views WHERE request_id IN (?)", ids).fetch do |row|
pvs << from_cassandra(row)
end
pvs
end
def self.find_every(options)
return super unless cassandra?
raise(NotImplementedError, "find_every not implemented")
end
def self.from_cassandra(attrs)
@blank_template ||= columns.inject({}) { |h,c| h[c.name] = nil; h }
Shard.default.activate { instantiate(@blank_template.merge(attrs)) }
end
def store
self.created_at ||= Time.zone.now
result = case PageView.page_view_method
when :log
Rails.logger.info "PAGE VIEW: #{self.attributes.to_json}"
when :cache
when :cache, :cassandra
json = self.attributes.as_json
json['is_update'] = true if self.is_update
if Canvas.redis.rpush(PageView.cache_queue_name, json.to_json)
true
else
# redis failed, push right to the db
self.save
end
Canvas.redis.rpush(PageView.cache_queue_name, json.to_json)
when :db
self.save
end
self.created_at ||= Time.zone.now
self.store_page_view_to_user_counts
result
@ -129,6 +132,94 @@ class PageView < ActiveRecord::Base
self.is_update = true
end
def create_without_callbacks
return super unless self.class.cassandra?
self.created_at ||= Time.zone.now
self.shard = Shard.default
update
if user
cassandra.execute("INSERT INTO page_views_history_by_context (context_and_time_bucket, ordered_id, request_id) VALUES (?, ?, ?)", "#{user.global_asset_string}/#{PageView.timeline_bucket_for_time(created_at, "User")}", "#{created_at.to_i}/#{request_id[0,8]}", request_id)
end
@new_record = false
self.id
end
def update_without_callbacks
return super unless self.class.cassandra?
cassandra.update_record("page_views", { :request_id => request_id }, self.changes)
true
end
named_scope :for_context, proc { |ctx| { :conditions => { :context_type => ctx.class.name, :context_id => ctx.id } } }
named_scope :for_users, proc { |users| { :conditions => { :user_id => users.map(&:id) } } }
# returns a collection with very limited functionality
# basically, it responds to #paginate and returns a
# WillPaginate::Collection-like object
def self.for_user(user)
if cassandra?
PageView::CassandraAssociation.new('User', user.global_id)
else
self.scoped(:conditions => { :user_id => user.id }, :order => 'created_at desc')
end
end
class CassandraAssociation
def initialize(context_type, context_id)
@context_type = context_type
@context_id = context_id
@scrollback_limit = Setting.get('page_views_scrollback_limit:users', 52.weeks.to_s).to_i.ago
end
class Collection < Array
attr_accessor :per_page, :next_page
# the following are for will_paginate compatibility, and will always be nil
attr_accessor :previous_page, :first_page, :last_page, :total_pages
end
def paginate(options)
results = Collection.new
results.per_page = options[:per_page].to_i
raise(ArgumentError, "per_page required") unless results.per_page > 0
if options[:page].to_s =~ %r{^(\d+):(\d+/\w+)$}
row_ts = $1.to_i
start_column = $2
else
# page 1
row_ts = PageView.timeline_bucket_for_time(Time.now, @context_type)
end
until results.next_page || Time.at(row_ts) < @scrollback_limit
limit = results.per_page + 1 - results.size
args = []
args << "#{@context_type.underscore}_#{@context_id}/#{row_ts}"
if start_column
ordered_id = "AND ordered_id <= ?"
args << start_column
else
ordered_id = nil
end
qs = "SELECT ordered_id, request_id FROM page_views_history_by_context where context_and_time_bucket = ? #{ordered_id} ORDER BY ordered_id DESC LIMIT #{limit}"
PageView.cassandra.execute(qs, *args).fetch do |row|
if results.size == results.per_page
results.next_page = "#{row_ts}:#{row['ordered_id']}"
else
results << row['request_id']
end
end
start_column = nil
# possible optimization: query page_views_counters_by_context_and_day ,
# and use it as a secondary index to skip days where the user didn't
# have any page views
row_ts -= PageView.timeline_bucket_size(@context_type)
end
results.replace(PageView.find(results).sort_by { |pv| results.index(pv.request_id) })
end
end
def self.cache_queue_name
'page_view_queue'
end
@ -175,7 +266,7 @@ class PageView < ActiveRecord::Base
return if attrs['is_update'] && Setting.get_cached('skip_pageview_updates', nil) == "true"
self.transaction(:requires_new => true) do
if attrs['is_update']
page_view = self.find_by_request_id(attrs['request_id'])
page_view = self.find_some([attrs['request_id']], {}).first
return unless page_view
page_view.do_update(attrs)
page_view.save
@ -187,6 +278,19 @@ class PageView < ActiveRecord::Base
rescue ActiveRecord::StatementInvalid => e
end
def self.timeline_bucket_for_time(time, context_type)
time.to_i - (time.to_i % timeline_bucket_size(context_type))
end
def self.timeline_bucket_size(context_type)
case context_type
when "User"
1.week.to_i
else
raise "don't know bucket size for context type: #{context_type}"
end
end
def self.user_count_bucket_for_time(time)
utc = time.in_time_zone('UTC')
# round down to the last 5 minute mark -- so 03:43:28 turns into 03:40:00
@ -202,4 +306,12 @@ class PageView < ActiveRecord::Base
Canvas.redis.sadd(bucket, self.user.global_id)
Canvas.redis.expire(bucket, exptime)
end
# to_csv uses these methods, see lib/ext/array.rb
def export_columns(format = nil)
PageView::EXPORTED_COLUMNS
end
def to_row(format = nil)
export_columns(format).map { |c| self.send(c) }
end
end

View File

@ -147,7 +147,6 @@ class User < ActiveRecord::Base
has_many :account_users
has_many :media_objects, :as => :context
has_many :user_generated_media_objects, :class_name => 'MediaObject'
has_many :page_views
has_many :user_notes
has_many :account_reports
has_many :stream_item_instances, :dependent => :delete_all
@ -177,6 +176,10 @@ class User < ActiveRecord::Base
all_conversations.visible.scoped(:order => "last_message_at DESC, conversation_id DESC")
end
def page_views
PageView.for_user(self)
end
named_scope :of_account, lambda { |account|
{
:joins => :user_account_associations,

View File

@ -0,0 +1,29 @@
# This file is used to configure any Apache Cassandra clusters that Canvas is
# going to use. Currently, all Cassandra usage is optional and disabled by
# default.
#
# If you are going to use Cassandra, Canvas requires version >= 1.1.5
#
# Similar to database configuration, Keyspaces are not created by Canvas.
# You'll need to create the keyspace you are going to use *before* enabling
# Cassandra and trying to run the migrations. This gives you the flexibility
# to configure replication to meet your retention requirements.
#
# Properly configuring a Cassandra cluster and replication is not trivial.
# Reading the documentation at http://www.datastax.com/docs/1.1/index is a good
# start.
development:
# To use Cassandra for page views:
# 1. Create the keyspace in Cassandra.
# 2. Enable and fill in the configuration below. You can specify as many "seed" servers as you'd like from the cluster.
# 3. Run rake db:migrate to create the tables in the specified Keyspace.
# 4. In a script/console, run Setting.set('enable_page_views', 'cassandra')
# 5. Restart all app/job processes.
# You'll also need to have redis enabled and configured in redis.yml.
# page_views:
# servers:
# - 127.0.0.1:9160
# keyspace: pageviews

View File

@ -71,7 +71,11 @@ class ActiveRecord::Base
end
def asset_string
@asset_string ||= "#{self.class.base_ar_class.name.underscore}_#{id.to_s}"
@asset_string ||= "#{self.class.base_ar_class.name.underscore}_#{id}"
end
def global_asset_string
@global_asset_string ||= "#{self.class.base_ar_class.name.underscore}_#{global_id}"
end
# little helper to keep checks concise and avoid a db lookup
@ -900,6 +904,10 @@ if defined?(ActiveRecord::ConnectionAdapters::PostgreSQLAdapter)
end
class ActiveRecord::Migration
VALID_TAGS = [:predeploy, :postdeploy, :cassandra]
# at least one of these tags is required
DEPLOY_TAGS = [:predeploy, :postdeploy]
class << self
def transactional?
@transactional != false
@ -909,7 +917,7 @@ class ActiveRecord::Migration
end
def tag(*tags)
raise "invalid tags #{tags.inspect}" unless tags - [:predeploy, :postdeploy] == []
raise "invalid tags #{tags.inspect}" unless tags - VALID_TAGS == []
(@tags ||= []).concat(tags).uniq!
end
@ -930,10 +938,14 @@ end
class ActiveRecord::MigrationProxy
delegate :connection, :transactional?, :tags, :to => :migration
def runnable?
!migration.respond_to?(:runnable?) || migration.runnable?
end
def load_migration
load(filename)
@migration = name.constantize
raise "#{self.name} (#{self.version}) is not tagged as predeploy or postdeploy!" if @migration.tags.empty? && self.version > 20120217214153
raise "#{self.name} (#{self.version}) is not tagged as predeploy or postdeploy!" if (@migration.tags & ActiveRecord::Migration::DEPLOY_TAGS).empty? && self.version > 20120217214153
@migration
end
end
@ -973,6 +985,11 @@ class ActiveRecord::Migrator
end
end
def pending_migrations_with_runnable
pending_migrations_without_runnable.reject { |m| !m.runnable? }
end
alias_method_chain :pending_migrations, :runnable
def migrate(tag = nil)
current = migrations.detect { |m| m.version == current_version }
target = migrations.detect { |m| m.version == @target_version }
@ -1001,6 +1018,7 @@ class ActiveRecord::Migrator
end
next if !tag.nil? && !migration.tags.include?(tag)
next if !migration.runnable?
begin
ddl_transaction(migration) do

View File

@ -68,7 +68,7 @@ if Mailman.config.poll_interval == 0 && Mailman.config.ignore_stdin == true
end
end
if PageView.page_view_method == :cache
if PageView.redis_queue?
# periodically pull new page views off the cache and insert them into the db
Delayed::Periodic.cron 'PageView.process_cache_queue', '*/1 * * * *' do
Shard.with_each_shard do

View File

@ -0,0 +1,59 @@
class AddCassandraPageViewTables < ActiveRecord::Migration
tag :predeploy
include Canvas::Cassandra::Migration
def self.cassandra_cluster
'page_views'
end
def self.up
cassandra.execute %{
CREATE TABLE page_views (
request_id text PRIMARY KEY,
session_id text,
user_id bigint,
url text,
context_id bigint,
context_type text,
asset_id bigint,
asset_type text,
controller text,
action text,
contributed boolean,
interaction_seconds double,
created_at timestamp,
updated_at timestamp,
developer_key_id bigint,
user_request boolean,
render_time double,
user_agent text,
asset_user_access_id bigint,
participated boolean,
summarized boolean,
account_id bigint,
real_user_id bigint,
) WITH
compression_parameters:sstable_compression='DeflateCompressor';
}
cassandra.execute %{
CREATE TABLE page_views_history_by_context (
context_and_time_bucket text,
ordered_id text,
request_id text,
PRIMARY KEY (context_and_time_bucket, ordered_id)
) WITH
compression_parameters:sstable_compression='DeflateCompressor';
}
end
def self.down
cassandra.execute %{
DROP TABLE page_views_history_by_context;
}
cassandra.execute %{
DROP TABLE page_views;
}
end
end

View File

@ -210,20 +210,6 @@ module Api
}
end
# stream an array of objects as a json response, without building a string of
# the whole response in memory.
def stream_json_array(array, json_opts)
response.content_type ||= Mime::JSON
render :text => proc { |r, o|
o.write('[')
array.each_with_index { |v,i|
o.write(v.to_json(json_opts));
o.write(',') unless i == array.length - 1
}
o.write(']')
}
end
# See User.submissions_for_given_assignments and SubmissionsApiController#for_students
mattr_accessor :assignment_ids_for_students_api

29
lib/api/v1/page_view.rb Normal file
View File

@ -0,0 +1,29 @@
#
# Copyright (C) 2012 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
module Api::V1::PageView
include Api::V1::Json
API_PAGE_VIEW_JSON_OPTS = {
:methods => ::PageView::EXPORTED_COLUMNS,
}
def page_view_json(page_view, current_user, session)
api_json(page_view, current_user, session, API_PAGE_VIEW_JSON_OPTS)
end
end

View File

@ -96,7 +96,7 @@ module Api::V1::User
def user_json_is_admin?(context = @context, current_user = @current_user)
@user_json_is_admin ||= {}
@user_json_is_admin[[context.class.name, context.id, current_user.id]] ||= (
if context.is_a?(UserProfile)
if context.is_a?(::UserProfile)
permissions_context = permissions_account = @domain_root_account
else
permissions_context = context

151
lib/canvas/cassandra.rb Normal file
View File

@ -0,0 +1,151 @@
#
# Copyright (C) 2012 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
module Canvas::Cassandra
class Database
def self.configured?(config_name)
raise ArgumentError, "config name required" if config_name.blank?
config = Setting.from_config('cassandra').try(:[], config_name)
config && config['servers'] && config['keyspace']
end
def self.from_config(config_name)
config = Setting.from_config('cassandra').try(:[], config_name)
raise ArgumentError, "No configuration for Cassandra for: #{config_name.inspect}" unless config
servers = Array(config['servers'])
raise "No Cassandra servers defined for: #{config_name.inspect}" unless servers.present?
keyspace = config['keyspace']
raise "No keyspace specified for: #{config_name.inspect}" unless keyspace.present?
self.new(servers, keyspace)
end
def self.config_names
Setting.from_config('cassandra').try(:keys) || []
end
def initialize(servers, keyspace)
Bundler.require 'cassandra'
@db = CassandraCQL::Database.new(servers, :keyspace => keyspace, :cql_version => '3.0.0')
end
attr_reader :db
# This just takes a raw query string, and params to replace `?` with.
# Though cassandra isn't relational, it'd still be useful to be able to
# build up queries using scopes. Maybe one of the ruby libs like datamapper
# or arel is flexible enough for this, rather than rolling our own.
def execute(query, *args)
result = nil
ms = Benchmark.ms do
result = @db.execute(query, *args)
end
Rails.logger.debug(" CQL (%.2fms) #{::CassandraCQL::Statement.sanitize(query, args)}" % [ms])
result
end
# update an AR-style record in cassandra
# table_name is the cassandra table name
# primary_key_attrs is a hash of { key => value } attributes to uniquely identify the record
# changes is a list of updates to apply, in the AR#changes format (so typically you can just call record.changes) or as just straight alues
# in other words, changes is a hash in either of these formats (mixing is ok):
# { "colname" => newvalue }
# { "colname" => [oldvalue, newvalue] }
def update_record(table_name, primary_key_attrs, changes)
statement, args = self.class.build_update_record_cql(table_name, primary_key_attrs, changes)
return unless statement
execute(statement, *args)
end
def select_value(query, *args)
result_row = execute(query, *args).fetch
result_row && result_row.to_hash.values.first
end
def keyspace_information
@db.keyspaces.find { |k| k.name == @db.keyspace }
end
protected
def self.build_update_record_cql(table_name, primary_key_attrs, changes)
where_args = []
primary_key_attrs = primary_key_attrs.with_indifferent_access
changes = changes.with_indifferent_access
where_clause = primary_key_attrs.sort_by { |k,v| k.to_s }.map { |k,v| where_args << v; "#{k} = ?" }.join(" AND ")
primary_key_attrs.each do |key,value|
if changes[key].is_a?(Array) && !changes[key].first.nil?
raise ArgumentError, "Cannot change the primary key of a record, attempted to change #{key} #{changes[key].inspect}"
end
end
deletes, updates = changes.
# normalize the values since we accept two formats
map { |key,val| [key.to_s, val.is_a?(Array) ? val.last : val] }.
# reject values that are part of the primary key, since those are in the where clause
reject { |key,val| primary_key_attrs.key?(key) }.
# sort, just so the generated cql is deterministic
sort_by(&:first).
# split changes into updates and deletes
partition { |key,val| val.nil? }
args = []
# inserts and updates in cassandra are equivalent,
# so no need to differentiate here
if updates.present?
update_cql = updates.map { |key,val| args << val; "#{key} = ?" }.join(", ")
update_statement = "UPDATE #{table_name} SET #{update_cql} WHERE #{where_clause}"
args.concat where_args
end
if deletes.present?
delete_cql = deletes.map(&:first).join(", ")
delete_statement = "DELETE #{delete_cql} FROM #{table_name} WHERE #{where_clause}"
args.concat where_args
end
if update_statement && delete_statement
# http://www.datastax.com/docs/1.1/references/cql/BATCH
# note there's no semicolons between statements in the batch
statement = "BEGIN BATCH #{update_statement} #{delete_statement} APPLY BATCH"
else
statement = update_statement || delete_statement
end
return statement, args
end
end
module Migration
module ClassMethods
def cassandra
@cassandra ||= Canvas::Cassandra::Database.from_config(cassandra_cluster)
end
def runnable?
raise "cassandra_cluster is required to be defined" unless respond_to?(:cassandra_cluster) && cassandra_cluster.present?
Shard.current.default? && Canvas::Cassandra::Database.configured?(cassandra_cluster)
end
end
def self.included(migration)
migration.tag :cassandra
migration.extend ClassMethods
end
end
end

View File

@ -74,6 +74,11 @@ ActiveRecord::Base.class_eval do
Shard.default
end
def shard=(new_shard)
raise ReadOnlyRecord unless new_record?
new_shard
end
def global_id
id
end

View File

@ -170,17 +170,21 @@ namespace :db do
end
namespace :test do
unless Rake::Task.task_defined?('db:test:reset')
task :reset => [:environment, :load_config] do
raise "Run with RAILS_ENV=test" unless Rails.env.test?
config = ActiveRecord::Base.configurations['test']
queue = config['queue']
drop_database(queue) if queue rescue nil
drop_database(config) rescue nil
create_database(queue) if queue
create_database(config)
Rake::Task['db:migrate'].invoke
task :reset => [:environment, :load_config] do
raise "Run with RAILS_ENV=test" unless Rails.env.test?
config = ActiveRecord::Base.configurations['test']
queue = config['queue']
drop_database(queue) if queue rescue nil
drop_database(config) rescue nil
Canvas::Cassandra::Database.config_names.each do |cass_config|
db = Canvas::Cassandra::Database.from_config(cass_config)
db.keyspace_information.tables.each do |table|
db.execute("DROP TABLE #{table}")
end
end
create_database(queue) if queue
create_database(config)
Rake::Task['db:migrate'].invoke
end
end
end

View File

@ -18,6 +18,7 @@
require File.expand_path(File.dirname(__FILE__) + '/../api_spec_helper')
require File.expand_path(File.dirname(__FILE__) + '/../file_uploads_spec_helper')
require File.expand_path(File.dirname(__FILE__) + '/../../cassandra_spec_helper')
class TestUserApi
include Api::V1::User
@ -198,24 +199,36 @@ describe "Users API", :type => :integration do
JSON.parse(response.body).should == {"status"=>"unauthorized", "message"=>"You are not authorized to perform that action."}
end
it "should return page view history" do
page_view_model(:user => @student, :created_at => 1.day.ago)
page_view_model(:user => @student)
page_view_model(:user => @student, :created_at => 1.day.from_now)
Setting.set('api_max_per_page', '2')
json = api_call(:get, "/api/v1/users/#{@student.id}/page_views?per_page=1000",
{ :controller => "page_views", :action => "index", :user_id => @student.to_param, :format => 'json', :per_page => '1000' })
json.size.should == 2
json.each { |j| j['url'].should == "http://www.example.com/courses/1" }
json[0]['created_at'].should be > json[1]['created_at']
response.headers['Link'].should match /next/
response.headers['Link'].should_not match /last/
json = api_call(:get, "/api/v1/users/sis_user_id:sis-user-id/page_views?page=2",
{ :controller => "page_views", :action => "index", :user_id => 'sis_user_id:sis-user-id', :format => 'json', :page => '2' })
json.size.should == 1
json.each { |j| j['url'].should == "http://www.example.com/courses/1" }
response.headers['Link'].should_not match /next/
response.headers['Link'].should_not match /last/
shared_examples_for "page view api" do
it "should return page view history" do
page_view_model(:user => @student, :created_at => 2.days.ago)
page_view_model(:user => @student)
page_view_model(:user => @student, :created_at => 1.day.ago)
Setting.set('api_max_per_page', '2')
json = api_call(:get, "/api/v1/users/#{@student.id}/page_views?per_page=1000",
{ :controller => "page_views", :action => "index", :user_id => @student.to_param, :format => 'json', :per_page => '1000' })
json.size.should == 2
json.each { |j| j['url'].should == "http://www.example.com/courses/1" }
json[0]['created_at'].should be > json[1]['created_at']
response.headers['Link'].should match /next/
response.headers['Link'].should_not match /last/
response.headers['Link'].split(',').find { |l| l =~ /<([^>]+)>.+next/ }
url = $1
page = Rack::Utils.parse_nested_query(url)['page']
json = api_call(:get, url,
{ :controller => "page_views", :action => "index", :user_id => @student.to_param, :format => 'json', :page => page, :per_page => Setting.get('api_max_per_page', '2') })
json.size.should == 1
json.each { |j| j['url'].should == "http://www.example.com/courses/1" }
response.headers['Link'].should_not match /next/
response.headers['Link'].should_not match /last/
end
end
it_should_behave_like "page view api"
describe "cassandra page views" do
it_should_behave_like "cassandra page views"
it_should_behave_like "page view api"
end
it "shouldn't find users in other root accounts by sis id" do

View File

@ -0,0 +1,31 @@
#
# Copyright (C) 2012 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require File.expand_path(File.dirname(__FILE__) + '/spec_helper')
shared_examples_for "cassandra page views" do
before do
if Canvas::Cassandra::Database.configured?('page_views')
Setting.set('enable_page_views', 'cassandra')
PageView.cassandra.execute("TRUNCATE page_views")
PageView.cassandra.execute("TRUNCATE page_views_history_by_context")
else
pending "needs cassandra page_views configuration"
end
end
end

View File

@ -17,6 +17,7 @@
#
require File.expand_path(File.dirname(__FILE__) + '/../spec_helper')
require File.expand_path(File.dirname(__FILE__) + '/../cassandra_spec_helper')
describe PageViewsController do
@ -39,47 +40,36 @@ describe PageViewsController do
pg
end
shared_examples_for "GET 'index' as csv" do
it "should succeed" do
course_with_teacher_logged_in
student_in_course
page_view(@user, '/somewhere/in/app', :created_at => 2.days.ago)
get 'index', :user_id => @user.id, :format => 'csv'
response.should be_success
end
it "should order rows by created_at in DESC order" do
course_with_teacher_logged_in
student_in_course
pv2 = page_view(@user, '/somewhere/in/app', :created_at => 2.days.ago) # 2nd day
pv1 = page_view(@user, '/somewhere/in/app/1', :created_at => 1.day.ago) # 1st day
pv3 = page_view(@user, '/somewhere/in/app/2', :created_at => 3.days.ago) # 3rd day
get 'index', :user_id => @user.id, :format => 'csv'
response.should be_success
dates = FasterCSV.parse(response.body, :headers => true).map { |row| row['created_at'] }
dates.should == [pv1, pv2, pv3].map(&:created_at).map(&:to_s)
end
end
context "with enable_page_views" do
context "with db page views" do
before :each do
Setting.set('enable_page_views', true)
end
it_should_behave_like "GET 'index' as csv"
end
describe "GET 'index'" do
it "should return nothing when HTML and not AJAX" do
course_with_teacher_logged_in
get 'index', :user_id => @user.id
response.should be_success
response.body.blank?.should == true
end
it "should return content when HTML and AJAX" do
course_with_teacher_logged_in
get 'index', :user_id => @user.id, :html_xhr => true
response.should be_success
response.body.blank?.should == false
end
end
describe "GET 'index' as csv" do
it "should succeed" do
course_with_teacher_logged_in
student_in_course
page_view(@user, '/somewhere/in/app', :created_at => 2.days.ago)
get 'index', :user_id => @user.id, :format => 'csv'
response.should be_success
end
it "should succeed order rows by created_at in DESC order" do
course_with_teacher_logged_in
student_in_course
page_view(@user, '/somewhere/in/app', :created_at => '2012-04-30 20:48:04') # 2nd day
page_view(@user, '/somewhere/in/app/1', :created_at => '2012-04-29 20:48:04') # 1st day
page_view(@user, '/somewhere/in/app/2', :created_at => '2012-05-01 20:48:04') # 3rd day
get 'index', :user_id => @user.id, :format => 'csv'
response.should be_success
response.body.should match /2012-05-01 20:48:04 UTC.*\n.*2012-04-30 20:48:04 UTC.*\n.*2012-04-29 20:48:04 UTC/
end
end
context "with cassandra page views" do
it_should_behave_like 'cassandra page views'
it_should_behave_like "GET 'index' as csv"
end
end

View File

@ -0,0 +1,68 @@
#
# Copyright (C) 2012 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require File.expand_path(File.dirname(__FILE__) + '/../../spec_helper.rb')
describe "Canvas::Redis::Cassandra" do
describe "#update_record" do
it "should do nothing if there are no updates or deletes" do
statement, args = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, {})
statement.should be_nil
end
it "should do lone updates" do
cql1 = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => "test" })
cql2 = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => [nil, "test"] })
cql1.should == ["UPDATE test_table SET name = ? WHERE id = ?", ["test", 5]]
cql1.should == cql2
end
it "should do multi-updates" do
cql = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => "test", :nick => ["old", "new"] })
cql.should == ["UPDATE test_table SET name = ?, nick = ? WHERE id = ?", ["test", "new", 5]]
end
it "should do lone deletes" do
cql1 = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => nil })
cql2 = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => ["old", nil] })
cql1.should == ["DELETE name FROM test_table WHERE id = ?", [5]]
cql1.should == cql2
end
it "should do multi-deletes" do
cql = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => nil, :nick => ["old", nil] })
cql.should == ["DELETE name, nick FROM test_table WHERE id = ?", [5]]
end
it "should do combined updates and deletes" do
cql = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5 }, { :name => "test", :nick => nil })
cql.should == ["BEGIN BATCH UPDATE test_table SET name = ? WHERE id = ? DELETE nick FROM test_table WHERE id = ? APPLY BATCH", ["test", 5, 5]]
end
it "should handle compound primary keys" do
cql = Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5, :sub_id => "sub!" }, { :name => "test", :id => 5, :sub_id => [nil, "sub!"] })
cql.should == ["UPDATE test_table SET name = ? WHERE id = ? AND sub_id = ?", ["test", 5, "sub!"]]
end
it "should disallow changing a primary key component" do
expect {
Canvas::Cassandra::Database.build_update_record_cql("test_table", { :id => 5, :sub_id => "sub!" }, { :name => "test", :id => 5, :sub_id => ["old", "sub!"]})
}.to raise_error(ArgumentError)
end
end
end

View File

@ -17,6 +17,7 @@
#
require File.expand_path(File.dirname(__FILE__) + '/../spec_helper.rb')
require File.expand_path(File.dirname(__FILE__) + '/../cassandra_spec_helper.rb')
describe PageView do
before do
@ -25,11 +26,59 @@ describe PageView do
@page_view = PageView.new { |p| p.send(:attributes=, { :url => "http://test.one/", :session_id => "phony", :context => @course, :controller => 'courses', :action => 'show', :user_request => true, :render_time => 0.01, :user_agent => 'None', :account_id => Account.default.id, :request_id => "abcde", :interaction_seconds => 5, :user => @user }, false) }
end
describe "cassandra page views" do
it_should_behave_like "cassandra page views"
it "should store and load from cassandra" do
expect {
@page_view.save!
}.to change { PageView.cassandra.execute("select count(*) from page_views").fetch_row["count"] }.by(1)
PageView.find(@page_view.id).should == @page_view
expect { PageView.find("junk") }.to raise_error(ActiveRecord::RecordNotFound)
end
it "should paginate with a willpaginate-like array" do
# some page views we shouldn't find
page_view_model(:user => user_model)
page_view_model(:user => user_model)
user_model
pvs = []
4.times { |i| pvs << page_view_model(:user => @user, :created_at => (5 - i).weeks.ago) }
pager = @user.page_views
pager.should be_a PageView::CassandraAssociation
expect { pager.paginate() }.to raise_exception(ArgumentError)
full = pager.paginate(:per_page => 4)
full.size.should == 4
full.next_page.should be_nil
half = pager.paginate(:per_page => 2)
half.should == full[0,2]
half.next_page.should be_present
second_half = pager.paginate(:per_page => 2, :page => half.next_page)
second_half.should == full[2,2]
second_half.next_page.should be_nil
end
it "should halt pagination after a set time period" do
p1 = page_view_model(:user => @user)
p2 = page_view_model(:user => @user, :created_at => 13.months.ago)
coll = @user.page_views.paginate(:per_page => 3)
coll.should == [p1]
coll.next_page.should be_blank
end
it "should ignore an invalid page" do
@page_view.save!
@user.page_views.paginate(:per_page => 2, :page => '3').should == [@page_view]
end
end
it "should store directly to the db in db mode" do
Setting.set('enable_page_views', 'db')
@page_view.store.should be_true
PageView.count.should == 1
PageView.first.should == @page_view
PageView.find(@page_view.id).should == @page_view
end
if Canvas.redis_enabled?
@ -42,7 +91,7 @@ describe PageView do
PageView.count.should == 0
PageView.process_cache_queue
PageView.count.should == 1
PageView.first.attributes.except('created_at', 'updated_at', 'summarized').should == @page_view.attributes.except('created_at', 'updated_at', 'summarized')
PageView.find(@page_view.id).attributes.except('created_at', 'updated_at', 'summarized').should == @page_view.attributes.except('created_at', 'updated_at', 'summarized')
end
it "should store into redis in transactional batches" do
@ -56,15 +105,6 @@ describe PageView do
PageView.count.should == 3
end
it "should store directly to the db if redis is down" do
Canvas::Redis.patch
Redis::Client.any_instance.expects(:ensure_connected).raises(Redis::TimeoutError)
@page_view.store.should be_true
PageView.count.should == 1
PageView.first.attributes.except('created_at', 'updated_at').should == @page_view.attributes.except('created_at', 'updated_at')
Canvas::Redis.reset_redis_failure
end
describe "batch transaction" do
self.use_transactional_fixtures = false
it "should not fail the batch if one row fails" do