improve cross-shard collection item and activity stream support
Create upvotes in the upvoting user's shard, so we can query for them, and make the upvote_count and post_count cached counts update properly cross-shard. Remove a foreign key constraint that was preventing users on other shards from posting items to a collection. Create stream_item_instances on the user's shard, and make sure to query them from there. Further work should be done to optimize :include so that we can efficient pull in the stream items for the instances. test plan: as a user on one shard, interact with a collection on another shard -- posting to it, upvoting/downvoting, cloning items from that and other shards. verify you don't get errors, missing data, or incorrect counts. Change-Id: I91aeebd404cd20663a533b2f38c08ec90c65868e Reviewed-on: https://gerrit.instructure.com/11228 Tested-by: Jenkins <jenkins@instructure.com> Reviewed-by: Cody Cutrer <cody@instructure.com> Reviewed-by: Simon Williams <simon@instructure.com>
This commit is contained in:
parent
84df9232d7
commit
9be4a8edea
|
@ -333,7 +333,7 @@ class CollectionItemsController < ApplicationController
|
|||
find_item_and_collection
|
||||
if authorized_action(@item, @current_user, :read)
|
||||
@upvote = find_upvote
|
||||
@upvote ||= @item.collection_item_data.collection_item_upvotes.create!({ :user => @current_user })
|
||||
@upvote ||= CollectionItemUpvote.create!(:user => @current_user, :collection_item_data => @item.data)
|
||||
render :json => collection_item_upvote_json(@item, @upvote, @current_user, session)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -60,38 +60,34 @@ class CollectionItem < ActiveRecord::Base
|
|||
DiscussionTopic.new(:context => self, :discussion_type => DiscussionTopic::DiscussionTypes::FLAT)
|
||||
end
|
||||
|
||||
alias_method :destroy!, :destroy
|
||||
def destroy
|
||||
self.workflow_state = 'deleted'
|
||||
save!
|
||||
end
|
||||
|
||||
trigger.after(:insert) do |t|
|
||||
t.where("NEW.workflow_state = 'active'") do
|
||||
<<-SQL
|
||||
UPDATE collection_item_datas
|
||||
SET post_count = post_count + 1
|
||||
WHERE id = NEW.collection_item_data_id;
|
||||
SQL
|
||||
end
|
||||
end
|
||||
after_save :update_post_count
|
||||
after_destroy :update_post_count
|
||||
|
||||
trigger.after(:update) do |t|
|
||||
t.where("NEW.workflow_state <> OLD.workflow_state") do
|
||||
<<-SQL
|
||||
UPDATE collection_item_datas
|
||||
SET post_count = post_count + CASE WHEN (NEW.workflow_state = 'active') THEN 1 ELSE -1 END
|
||||
WHERE id = NEW.collection_item_data_id;
|
||||
SQL
|
||||
def update_post_count
|
||||
increment = 0
|
||||
if self.id_changed?
|
||||
# was a new record
|
||||
increment = 1 if self.active?
|
||||
elsif self.destroyed?
|
||||
increment = -1
|
||||
elsif self.workflow_state_changed?
|
||||
if self.active?
|
||||
increment = 1
|
||||
else
|
||||
increment = -1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
trigger.after(:delete) do |t|
|
||||
t.where("OLD.workflow_state = 'active'") do
|
||||
<<-SQL
|
||||
UPDATE collection_item_datas
|
||||
SET post_count = post_count - 1
|
||||
WHERE id = OLD.collection_item_data_id;
|
||||
SQL
|
||||
if increment != 0
|
||||
data.shard.activate do
|
||||
data.class.update_all(['post_count = post_count + ?', increment], :id => data.id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -94,10 +94,10 @@ class CollectionItemData < ActiveRecord::Base
|
|||
attr_accessor :upvoted_by_user
|
||||
# sets the upvoted_by_user attribute on each item passed in
|
||||
def self.load_upvoted_by_user(datas, user)
|
||||
Shard.partition_by_shard(datas) do |datas_subset|
|
||||
data_ids = datas_subset.map(&:id)
|
||||
user.shard.activate do
|
||||
data_ids = datas.map(&:id)
|
||||
upvoted_ids = Set.new(connection.select_values(sanitize_sql_for_conditions(["SELECT collection_item_data_id FROM collection_item_upvotes WHERE collection_item_data_id IN (?) AND user_id = ?", data_ids, user.id])))
|
||||
datas_subset.each { |item| item.upvoted_by_user = upvoted_ids.include?(item.id.to_s) }
|
||||
datas.each { |item| item.upvoted_by_user = upvoted_ids.include?(item.id.to_s) }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -25,19 +25,28 @@ class CollectionItemUpvote < ActiveRecord::Base
|
|||
validates_presence_of :collection_item_data, :user
|
||||
attr_readonly :collection_item_data_id, :user_id
|
||||
|
||||
trigger.after(:insert) do
|
||||
<<-SQL
|
||||
UPDATE collection_item_datas
|
||||
SET upvote_count = upvote_count + 1
|
||||
WHERE id = NEW.collection_item_data_id;
|
||||
SQL
|
||||
after_create :update_upvote_count
|
||||
after_destroy :update_upvote_count
|
||||
|
||||
# upvotes get saved to the user's shard, and then increment the counter
|
||||
# stored on the collection_item_data, wherever it may live
|
||||
set_shard_override do |record|
|
||||
record.user.shard
|
||||
end
|
||||
|
||||
trigger.after(:delete) do
|
||||
<<-SQL
|
||||
UPDATE collection_item_datas
|
||||
SET upvote_count = upvote_count - 1
|
||||
WHERE id = OLD.collection_item_data_id;
|
||||
SQL
|
||||
def update_upvote_count
|
||||
increment = 0
|
||||
if self.id_changed?
|
||||
# was a new record
|
||||
increment = 1
|
||||
elsif self.destroyed?
|
||||
increment = -1
|
||||
end
|
||||
|
||||
if increment != 0
|
||||
collection_item_data.shard.activate do
|
||||
collection_item_data.class.update_all(['upvote_count = upvote_count + ?', increment], :id => collection_item_data.id)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -218,12 +218,14 @@ class StreamItem < ActiveRecord::Base
|
|||
|
||||
# Then insert a StreamItemInstance for each user in user_ids
|
||||
instance_ids = []
|
||||
StreamItemInstance.transaction do
|
||||
user_ids.each do |user_id|
|
||||
i = res.stream_item_instances.build(:user_id => user_id)
|
||||
i.hidden = object.class == Submission && object.assignment.muted? ? true : false
|
||||
i.save
|
||||
instance_ids << i.id
|
||||
Shard.partition_by_shard(user_ids) do |user_ids_subset|
|
||||
StreamItemInstance.transaction do
|
||||
user_ids_subset.each do |user_id|
|
||||
i = StreamItemInstance.create(:user_id => user_id, :stream_item => res) do |sii|
|
||||
sii.hidden = object.class == Submission && object.assignment.muted? ? true : false
|
||||
end
|
||||
instance_ids << i.id
|
||||
end
|
||||
end
|
||||
end
|
||||
smallest_generated_id = instance_ids.min || 0
|
||||
|
|
|
@ -20,7 +20,7 @@ class StreamItemInstance < ActiveRecord::Base
|
|||
belongs_to :user
|
||||
belongs_to :stream_item
|
||||
|
||||
attr_accessible :user_id
|
||||
attr_accessible :user_id, :stream_item
|
||||
|
||||
before_save :set_context_code
|
||||
def set_context_code
|
||||
|
|
|
@ -125,7 +125,6 @@ class User < ActiveRecord::Base
|
|||
has_many :user_notes
|
||||
has_many :account_reports
|
||||
has_many :stream_item_instances, :dependent => :delete_all
|
||||
has_many :stream_items, :through => :stream_item_instances
|
||||
has_many :all_conversations, :class_name => 'ConversationParticipant', :include => :conversation
|
||||
has_many :favorites
|
||||
has_many :favorite_courses, :source => :course, :through => :current_and_invited_enrollments, :conditions => "EXISTS (SELECT 1 FROM favorites WHERE context_type = 'Course' AND context_id = enrollments.course_id AND user_id = enrollments.user_id)"
|
||||
|
@ -1681,24 +1680,24 @@ class User < ActiveRecord::Base
|
|||
end
|
||||
memoize :recent_feedback
|
||||
|
||||
def visible_stream_items(opts={})
|
||||
items = stream_items.scoped(:conditions => { 'stream_item_instances.hidden' => false }, :order => 'stream_item_instances.id desc')
|
||||
def visible_stream_item_instances(opts={})
|
||||
instances = stream_item_instances.scoped(:conditions => { 'stream_item_instances.hidden' => false }, :order => 'stream_item_instances.id desc', :include => :stream_item)
|
||||
|
||||
# dont make the query do an stream_items.context_code IN
|
||||
# dont make the query do an stream_item_instances.context_code IN
|
||||
# ('course_20033','course_20237','course_20247' ...) if they dont pass any
|
||||
# contexts, just assume it wants any context code.
|
||||
if opts[:contexts]
|
||||
# still need to optimize the query to use a root_context_code. that way a
|
||||
# users course dashboard even if they have groups does a query with
|
||||
# "context_code=..." instead of "context_code IN ..."
|
||||
items = items.scoped(:conditions => ['stream_item_instances.context_code in (?)', setup_context_lookups(opts[:contexts])])
|
||||
instances = instances.scoped(:conditions => ['stream_item_instances.context_code in (?)', setup_context_lookups(opts[:contexts])])
|
||||
end
|
||||
|
||||
items
|
||||
instances
|
||||
end
|
||||
|
||||
def recent_stream_items(opts={})
|
||||
visible_stream_items(opts).scoped(:limit => 21).all
|
||||
visible_stream_item_instances(opts).scoped(:include => :stream_item, :limit => 21).map(&:stream_item)
|
||||
end
|
||||
memoize :recent_stream_items
|
||||
|
||||
|
|
|
@ -1,83 +0,0 @@
|
|||
# This migration was auto-generated via `rake db:generate_trigger_migration'.
|
||||
# While you can edit this file, any changes you make to the definitions here
|
||||
# will be undone by the next auto-generated trigger migration.
|
||||
|
||||
class CreateTriggersForCollections < ActiveRecord::Migration
|
||||
tag :predeploy
|
||||
|
||||
def self.up
|
||||
create_trigger("collection_items_after_insert_row_tr", :generated => true, :compatibility => 1).
|
||||
on("collection_items").
|
||||
after(:insert) do |t|
|
||||
t.where("NEW.workflow_state = 'active'") do
|
||||
<<-SQL_ACTIONS
|
||||
UPDATE collection_item_datas
|
||||
SET post_count = post_count + 1
|
||||
WHERE id = NEW.collection_item_data_id;
|
||||
SQL_ACTIONS
|
||||
end
|
||||
end
|
||||
|
||||
create_trigger("collection_items_after_update_row_tr", :generated => true, :compatibility => 1).
|
||||
on("collection_items").
|
||||
after(:update) do |t|
|
||||
t.where("NEW.workflow_state <> OLD.workflow_state") do
|
||||
<<-SQL_ACTIONS
|
||||
UPDATE collection_item_datas
|
||||
SET post_count = post_count + CASE WHEN (NEW.workflow_state = 'active') THEN 1 ELSE -1 END
|
||||
WHERE id = NEW.collection_item_data_id;
|
||||
SQL_ACTIONS
|
||||
end
|
||||
end
|
||||
|
||||
create_trigger("collection_items_after_delete_row_tr", :generated => true, :compatibility => 1).
|
||||
on("collection_items").
|
||||
after(:delete) do |t|
|
||||
t.where("OLD.workflow_state = 'active'") do
|
||||
<<-SQL_ACTIONS
|
||||
UPDATE collection_item_datas
|
||||
SET post_count = post_count - 1
|
||||
WHERE id = OLD.collection_item_data_id;
|
||||
SQL_ACTIONS
|
||||
end
|
||||
end
|
||||
|
||||
create_trigger("collection_item_upvotes_after_insert_row_tr", :generated => true, :compatibility => 1).
|
||||
on("collection_item_upvotes").
|
||||
after(:insert) do
|
||||
<<-SQL_ACTIONS
|
||||
UPDATE collection_item_datas
|
||||
SET upvote_count = upvote_count + 1
|
||||
WHERE id = NEW.collection_item_data_id;
|
||||
SQL_ACTIONS
|
||||
end
|
||||
|
||||
create_trigger("collection_item_upvotes_after_delete_row_tr", :generated => true, :compatibility => 1).
|
||||
on("collection_item_upvotes").
|
||||
after(:delete) do
|
||||
<<-SQL_ACTIONS
|
||||
UPDATE collection_item_datas
|
||||
SET upvote_count = upvote_count - 1
|
||||
WHERE id = OLD.collection_item_data_id;
|
||||
SQL_ACTIONS
|
||||
end
|
||||
end
|
||||
|
||||
def self.down
|
||||
drop_trigger("collection_items_after_insert_row_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_insert_row_when_new_workflow_state_ac_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_update_row_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_update_row_when_new_workflow_state_ol_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_delete_row_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_delete_row_when_old_workflow_state_ac_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_item_upvotes_after_insert_row_tr", "collection_item_upvotes", :generated => true)
|
||||
|
||||
drop_trigger("collection_item_upvotes_after_delete_row_tr", "collection_item_upvotes", :generated => true)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,12 @@
|
|||
class DropCollectionItemUserFk < ActiveRecord::Migration
|
||||
tag :predeploy
|
||||
|
||||
def self.up
|
||||
# the user can be on another shard for group collections
|
||||
remove_foreign_key :collection_items, :users
|
||||
end
|
||||
|
||||
def self.down
|
||||
add_foreign_key :collection_items, :users
|
||||
end
|
||||
end
|
|
@ -0,0 +1,34 @@
|
|||
# This migration was auto-generated via `rake db:generate_trigger_migration'.
|
||||
# While you can edit this file, any changes you make to the definitions here
|
||||
# will be undone by the next auto-generated trigger migration.
|
||||
|
||||
class DropTriggersForCollections < ActiveRecord::Migration
|
||||
tag :predeploy
|
||||
|
||||
# we had to switch to another strategy rather than triggers because the
|
||||
# different rows might be in different databases
|
||||
#
|
||||
# the triggers may not exist in the db, since we deleted the migration that
|
||||
# creates them -- drop_trigger will handle that gracefully
|
||||
def self.up
|
||||
drop_trigger("collection_items_after_insert_row_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_insert_row_when_new_workflow_state_ac_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_update_row_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_update_row_when_new_workflow_state_ol_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_delete_row_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_items_after_delete_row_when_old_workflow_state_ac_tr", "collection_items", :generated => true)
|
||||
|
||||
drop_trigger("collection_item_upvotes_after_insert_row_tr", "collection_item_upvotes", :generated => true)
|
||||
|
||||
drop_trigger("collection_item_upvotes_after_delete_row_tr", "collection_item_upvotes", :generated => true)
|
||||
end
|
||||
|
||||
def self.down
|
||||
raise ActiveRecord::IrreversibleMigration
|
||||
end
|
||||
end
|
|
@ -115,7 +115,10 @@ module Api::V1::StreamItem
|
|||
opts = {}
|
||||
opts[:contexts] = contexts if contexts.present?
|
||||
|
||||
scope = @current_user.visible_stream_items(opts)
|
||||
render :json => Api.paginate(scope, self, self.send(paginate_url, @context)).map { |i| stream_item_json(i, @current_user, session) }
|
||||
items = @current_user.shard.activate do
|
||||
scope = @current_user.visible_stream_item_instances(opts)
|
||||
Api.paginate(scope, self, self.send(paginate_url, @context)).to_a
|
||||
end
|
||||
render :json => items.map { |i| stream_item_json(i.stream_item, @current_user, session) }
|
||||
end
|
||||
end
|
||||
|
|
|
@ -17,9 +17,9 @@
|
|||
#
|
||||
|
||||
require File.expand_path(File.dirname(__FILE__) + '/../spec_helper')
|
||||
require File.expand_path(File.dirname(__FILE__) + '/../sharding_spec_helper')
|
||||
|
||||
describe 'CollectionItem' do
|
||||
|
||||
describe 'Stream Item' do
|
||||
it "should generate stream items for users following the collection" do
|
||||
group_with_user
|
||||
|
@ -32,10 +32,57 @@ describe 'CollectionItem' do
|
|||
|
||||
@item = collection_item_model(:collection => @coll, :user => @user2)
|
||||
|
||||
@user2.visible_stream_items.should be_empty
|
||||
items = @user1.visible_stream_items
|
||||
@user2.visible_stream_item_instances.should be_empty
|
||||
items = @user1.visible_stream_item_instances.map(&:stream_item)
|
||||
items.size.should == 1
|
||||
items.first.data.type.should == 'CollectionItem'
|
||||
end
|
||||
end
|
||||
|
||||
context "across shards" do
|
||||
it_should_behave_like "sharding"
|
||||
|
||||
it "should handle user upvotes on another shard" do
|
||||
@shard1.activate { @user1 = user_model }
|
||||
@shard2.activate { @item = collection_item_model(:collection => group_model.collections.create!, :user => user_model) }
|
||||
@upvote = CollectionItemUpvote.create!(:user => @user1, :collection_item_data => @item.data)
|
||||
@upvote.shard.should == @user1.shard
|
||||
@item.data.reload.upvote_count.should == 1
|
||||
|
||||
[ Shard.default, @shard1, @shard2 ].each do |shard|
|
||||
shard.activate do
|
||||
CollectionItemData.load_upvoted_by_user([@item.reload.data], @user1)
|
||||
@item.data.upvoted_by_user.should == true
|
||||
end
|
||||
end
|
||||
|
||||
@upvote.destroy
|
||||
@item.data.reload.upvote_count.should == 0
|
||||
|
||||
[ Shard.default, @shard1, @shard2 ].each do |shard|
|
||||
shard.activate do
|
||||
CollectionItemData.load_upvoted_by_user([@item.reload.data], @user1)
|
||||
@item.data.upvoted_by_user.should == false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
it "should handle clones on another shard" do
|
||||
@shard1.activate { @user1 = user_model }
|
||||
@shard2.activate { @item1 = collection_item_model(:collection => group_model.collections.create!, :user => user_model) }
|
||||
@item2 = @user1.collections.create!.collection_items.create!(:collection_item_data => @item1.data, :user => @user1)
|
||||
@item2.shard.should == @user1.shard
|
||||
@data = @item1.data.reload
|
||||
@data.should == @item2.data
|
||||
@data.shard.should == @item1.shard
|
||||
@data.post_count.should == 2
|
||||
|
||||
@item2.destroy
|
||||
@data.reload.post_count.should == 1
|
||||
@item2.update_attribute(:workflow_state, 'active')
|
||||
@data.reload.post_count.should == 2
|
||||
@item2.destroy!
|
||||
@data.reload.post_count.should == 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#
|
||||
|
||||
require File.expand_path(File.dirname(__FILE__) + '/../spec_helper.rb')
|
||||
require File.expand_path(File.dirname(__FILE__) + '/../sharding_spec_helper')
|
||||
|
||||
describe StreamItem do
|
||||
it "should not infer a user_id for DiscussionTopic" do
|
||||
|
@ -34,11 +35,30 @@ describe StreamItem do
|
|||
notification_model(:name => 'Assignment Created')
|
||||
course_with_student(:active_all => true)
|
||||
assignment_model(:course => @course)
|
||||
item = @user.stream_items.first
|
||||
item = @user.stream_item_instances.first.stream_item
|
||||
item.data.notification_name.should == 'Assignment Created'
|
||||
item.context_code.should == @course.asset_string
|
||||
|
||||
course_items = @user.recent_stream_items(:contexts => [@course])
|
||||
course_items.should == [item]
|
||||
end
|
||||
|
||||
context "across shards" do
|
||||
it_should_behave_like "sharding"
|
||||
|
||||
it "should create stream items on the user's shard" do
|
||||
group_with_user
|
||||
@user1 = @user
|
||||
@user2 = @shard1.activate { user_model }
|
||||
@coll = @group.collections.create!
|
||||
|
||||
UserFollow.create_follow(@user1, @coll)
|
||||
UserFollow.create_follow(@user2, @coll)
|
||||
|
||||
@item = collection_item_model(:collection => @coll, :user => @user1)
|
||||
@shard1.activate do
|
||||
@user2.reload.visible_stream_item_instances.map { |i| i.stream_item.data.type }.should == ['CollectionItem']
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue