initial live events implementation

This commit adds a new module called LiveEvents that knows how to send a
certain set of events to Kinesis. The module is configured via
normal plugin settings per account. Once the plugin is configured with
a Kinesis stream, events will start getting sent to that stream.

Events are sent asynchronously, in a background thread.

test plan:
 * See `doc/live_events.md` for instructions on how to setup a local
   kinesis stream and configure the LiveEvents plugin.
 * Start tailing the stream with the command specified in
   `doc/live_events.md` in a terminal.
 * Perform the actions described in `doc/api/live_events.md` and verify
   that events show up in your Kinesis terminal with the correct data.

Change-Id: Id799688c972205a1eee84a673912f84b0c7abb57
Reviewed-on: https://gerrit.instructure.com/50324
Reviewed-by: Rob Orton <rob@instructure.com>
Tested-by: Jenkins
Reviewed-by: Jacob Fugal <jacob@instructure.com>
QA-Review: Jacob Fugal <jacob@instructure.com>
Product-Review: Zach Wily <zach@instructure.com>
This commit is contained in:
Zach Wily 2015-03-13 14:07:54 -06:00 committed by Zach Wily
parent 6821dfe030
commit 5d232f03af
40 changed files with 1419 additions and 7 deletions

View File

@ -26,7 +26,7 @@ gem 'folio-pagination', '0.0.7', :require => 'folio/rails'
gem 'will_paginate', '3.0.4', :require => false
gem "after_transaction_commit", '1.0.1'
gem "aws-sdk", '1.52.0'
gem "aws-sdk", '1.63.0'
gem 'uuidtools', '2.1.4'
gem 'barby', '0.5.0'
gem 'chunky_png', '1.3.0'
@ -137,6 +137,7 @@ gem 'html_text_helper', :path => 'gems/html_text_helper'
gem 'incoming_mail_processor', :path => 'gems/incoming_mail_processor'
gem 'json_token', :path => 'gems/json_token'
gem 'linked_in', :path => 'gems/linked_in'
gem 'live_events', :path => 'gems/live_events'
gem 'diigo', :path => 'gems/diigo'
gem 'lti_outbound', :path => 'gems/lti_outbound'
gem 'multipart', :path => 'gems/multipart'

View File

@ -6,6 +6,7 @@ group :test do
gem 'simplecov-rcov', '0.2.3', :require => false
gem 'bluecloth', '2.2.0' # for generating api docs
gem 'redcarpet', '3.0.0'
gem 'github-markdown', '0.6.8'
gem 'bullet_instructure', '4.0.3', :require => 'bullet'
gem 'mocha', github: 'maneframe/mocha', :ref => 'bb8813fbb4cc589d7c58073d93983722d61b6919', :require => false
gem 'metaclass', '0.0.2'
@ -27,7 +28,7 @@ group :test do
gem 'webmock', '1.16.1', :require => false
gem 'addressable', '2.3.5'
gem 'crack', '0.4.1'
gem 'yard', '0.8.0'
gem 'yard', '0.8.7.6'
gem 'yard-appendix', '>=0.1.8'
gem 'timecop', '0.6.3'
end

View File

@ -49,6 +49,7 @@ class ApplicationController < ActionController::Base
before_filter :set_page_view
before_filter :require_reacceptance_of_terms
before_filter :clear_policy_cache
before_filter :setup_live_events_context
after_filter :log_page_view
after_filter :discard_flash_if_xhr
after_filter :cache_buster
@ -59,6 +60,7 @@ class ApplicationController < ActionController::Base
before_filter :init_body_classes
after_filter :set_response_headers
after_filter :update_enrollment_last_activity_at
after_filter :teardown_live_events_context
include Tour
add_crumb(proc {
@ -507,6 +509,10 @@ class ApplicationController < ActionController::Base
set_badge_counts_for(@context, @current_user, @current_enrollment)
end
end
# There is lots of interesting information set up in here, that we want
# to place into the live events context.
setup_live_events_context
end
# This is used by a number of actions to retrieve a list of all contexts
@ -1882,5 +1888,37 @@ class ApplicationController < ActionController::Base
nil
end
helper_method :request_delete_account_link
end
def setup_live_events_context
ctx = {}
ctx[:root_account_id] = @domain_root_account.global_id if @domain_root_account
ctx[:user_id] = @current_user.global_id if @current_user
ctx[:real_user_id] = @real_current_user.global_id if @real_current_user
ctx[:user_login] = @current_pseudonym.unique_id if @current_pseudonym
ctx[:hostname] = request.host
ctx[:user_agent] = request.headers['User-Agent']
ctx[:context_type] = @context.class.to_s if @context
ctx[:context_id] = @context.global_id if @context
if @context_membership
ctx[:context_role] =
if @context_membership.respond_to?(:role)
@context_membership.role.name
elsif @context_membership.respond_to?(:type)
@context_membership.type
else
@context_membership.class.to_s
end
end
if tctx = Thread.current[:context]
ctx[:request_id] = tctx[:request_id]
ctx[:session_id] = tctx[:session_id]
end
LiveEvents.set_context(ctx)
end
def teardown_live_events_context
LiveEvents.clear_context!
end
end

View File

@ -643,6 +643,12 @@ class PseudonymSessionsController < ApplicationController
CanvasBreachMitigation::MaskingSecrets.reset_authenticity_token!(cookies)
Auditors::Authentication.record(@current_pseudonym, 'login')
# Since the user just logged in, we'll reset the context to include their info.
setup_live_events_context
# TODO: Only send this if the current_pseudonym's root account matches the current root
# account?
Canvas::LiveEvents.logged_in(session)
otp_passed ||= @current_user.validate_otp_secret_key_remember_me_cookie(cookies['canvas_otp_remember_me'], request.remote_ip)
if !otp_passed
mfa_settings = @current_user.mfa_settings
@ -712,6 +718,7 @@ class PseudonymSessionsController < ApplicationController
def logout_current_user
CanvasBreachMitigation::MaskingSecrets.reset_authenticity_token!(cookies)
Auditors::Authentication.record(@current_pseudonym, 'logout')
Canvas::LiveEvents.logged_out
Lti::LogoutService.queue_callbacks(@current_pseudonym)
super
end

View File

@ -345,6 +345,9 @@ class Quizzes::QuizSubmissionsApiController < ApplicationController
def complete
@service.complete @quiz_submission, params[:attempt]
# TODO: should this go in the service instead?
Canvas::LiveEvents.quiz_submitted(@quiz_submission)
serialize_and_render @quiz_submission
end

View File

@ -68,6 +68,8 @@ class Quizzes::QuizSubmissionsController < ApplicationController
@submission.record_answer(params_hash.dup)
flash[:notice] = t('errors.late_quiz', "You submitted this quiz late, and your answers may not have been recorded.") if @submission.overdue?
Quizzes::SubmissionGrader.new(@submission).grade_submission
Canvas::LiveEvents.quiz_submitted(@submission)
end
end
if session.delete('lockdown_browser_popup')

View File

@ -154,6 +154,7 @@ class Auditors::GradeChange
return unless submission
submission.shard.activate do
record = Auditors::GradeChange::Record.generate(submission, event_type)
Canvas::LiveEvents.grade_changed(submission, record.attributes['grade_before'])
Auditors::GradeChange::Stream.insert(record)
end
end

View File

@ -0,0 +1,50 @@
class LiveEventsObserver < ActiveRecord::Observer
observe :course,
:discussion_entry,
:discussion_topic,
:group,
:group_category,
:group_membership,
:wiki_page
def after_update(obj)
case obj
when Course
if obj.syllabus_body_changed?
Canvas::LiveEvents.course_syllabus_updated(obj, obj.syllabus_body_was)
end
when WikiPage
if obj.title_changed? || obj.body_changed?
Canvas::LiveEvents.wiki_page_updated(obj, obj.title_changed? ? obj.title_was : nil,
obj.body_changed? ? obj.body_was : nil)
end
end
end
def after_create(obj)
case obj
when DiscussionEntry
Canvas::LiveEvents.discussion_entry_created(obj)
when DiscussionTopic
Canvas::LiveEvents.discussion_topic_created(obj)
when Group
Canvas::LiveEvents.group_created(obj)
when GroupCategory
Canvas::LiveEvents.group_category_created(obj)
when GroupMembership
Canvas::LiveEvents.group_membership_created(obj)
when WikiPage
Canvas::LiveEvents.wiki_page_created(obj)
end
end
def after_save(obj)
end
def after_destroy(obj)
case obj
when WikiPage
Canvas::LiveEvents.wiki_page_deleted(obj)
end
end
end

View File

@ -0,0 +1,53 @@
<%= fields_for :settings, OpenObject.new(settings) do |f| %>
<table style="width: 500px;" class="formtable">
<tr>
<td colspan="2">
<p><%= t(:description, <<-TEXT)
Pushes events to a Kinesis stream.
TEXT
%></p>
</td>
</tr>
<tr>
<td><%= f.blabel :kinesis_stream_name, :en => "Kinesis Stream Name" %></td>
<td>
<%= f.text_field :kinesis_stream_name %>
</td>
</tr>
<tr>
<td colspan="2">
<p><%= t(:endpoint_region_requirement, <<-TEXT)
Either the AWS Region or Endpoint are required. The endpoint is typically only
used for development.
TEXT
%></p>
</td>
</tr>
<tr>
<td><%= f.blabel :aws_region, :en => "AWS Region" %></td>
<td>
<%= f.text_field :aws_region %>
</td>
</tr>
<tr>
<td><%= f.blabel :aws_endpoint, :en => "AWS Endpoint (url)" %></td>
<td>
<%= f.text_field :aws_endpoint %>
</td>
</tr>
<tr>
<td><%= f.blabel :aws_access_key_id, :en => "AWS Access Key ID" %></td>
<td>
<%= f.text_field :aws_access_key_id %>
</td>
</tr>
<tr>
<td><%= f.blabel :aws_secret_access_key, :en => "AWS Secret Access Key" %></td>
<td>
<%= f.password_field :aws_secret_access_key %>
</td>
</tr>
</table>
<% end %>

View File

@ -78,7 +78,7 @@ module CanvasRails
end
# Activate observers that should always be running
config.active_record.observers = [:cacher, :stream_item_cache]
config.active_record.observers = [:cacher, :stream_item_cache, :live_events_observer ]
config.autoload_paths += %W(#{Rails.root}/app/middleware
#{Rails.root}/app/observers

View File

@ -54,6 +54,12 @@ Delayed::Worker.lifecycle.around(:perform) do |worker, job, &block|
:session_id => worker.name,
}
LiveEvents.set_context({
:root_account_id => job.respond_to?(:global_account_id) ? job.global_account_id : nil,
:job_id => job.global_id,
:job_tag => job.tag
})
starting_mem = Canvas.sample_memory()
starting_cpu = Process.times()
lag = ((Time.now - job.run_at) * 1000).round
@ -75,6 +81,8 @@ Delayed::Worker.lifecycle.around(:perform) do |worker, job, &block|
system_cpu = ending_cpu.stime - starting_cpu.stime
Thread.current[:context] = nil
LiveEvents.clear_context!
Rails.logger.info "[STAT] #{starting_mem} #{ending_mem} #{ending_mem - starting_mem} #{user_cpu} #{system_cpu}"
end
end

View File

@ -0,0 +1,8 @@
Rails.configuration.to_prepare do
LiveEvents.logger = Rails.logger
LiveEvents.cache = Rails.cache
LiveEvents.statsd = CanvasStatsd::Statsd
LiveEvents.max_queue_size = -> { Setting.get('live_events_max_queue_size', 1000).to_i }
LiveEvents.plugin_settings = -> { Canvas::Plugin.find(:live_events) }
end

View File

@ -115,6 +115,20 @@ h2 .defined-in {
.method_details h4 {
}
table {
border: 1px solid gray;
border-collapse: collapse;
}
table th {
background-color: #DDD;
}
table th, table td {
border: 1px inset gray;
padding: 4px;
}
table.sis_csv {
border: 2px solid gray;
border-collapse: collapse;
@ -194,4 +208,4 @@ h2.api_method_name a {
div.appendix img {
max-width: 800px;
}
}

View File

@ -26,4 +26,8 @@
<a href="<%= url_for("file.homework_submission_tools.html") %>" class="<%= 'current' if options[:object] == 'file.homework_submission_tools.html' %>">Homework Submission</a>
<a href="<%= url_for("file.xapi.html") %>" class="<%= 'current' if options[:object] == 'file.xapi.html' %>">xAPI</a>
<a href="<%= url_for("file.tools_xml.html") %>" class="<%= 'current' if options[:object] == 'file.tools_xml.html' %>">Configuring</a>
<!-- still experimental, so we're not advertising it.
<h2>Other</h2>
<a href="<%= url_for("file.live_events.html") %>" class="<%= 'current' if options[:object] == 'file.live_events.html' %>">Live Events</a>
-->
</nav>

200
doc/api/live_events.md Normal file
View File

@ -0,0 +1,200 @@
Live Events (experimental)
==============
Live Events allows you to receive a set of real-time events happening in
your Canvas instance. The events are delivered to a queue which you are
then responsible for consuming. Supported events are described below.
Events are delivered in a "best-effort" fashion. In order to not slow
down web requests, events are sent asynchronously from web requests.
That means that there's a window where an event may happen, but the
process responsible for sending the request to the queue is not able to
queue it.
The currently supported queue is <a href="https://aws.amazon.com/sqs/">Amazon SQS</a>.
Live Events is currently an invite-only, experimental feature.
### Event Format
#### Attributes
Events are delivered with attributes and a body. The attributes are:
| Name | Type | Description | Example |
| ---- | ---- | ----------- | ------- |
| `event_name` | String | The name of the event. | `create_discussion_topic` |
| `event_time` | String.timestamp | The time, in ISO 8601 format. | `2015-03-18T15:15:54Z` |
#### Body (metadata)
The body is a JSON-formatted string with two keys: `metadata` and `data`.
`metadata` will be any or all of the following keys and values, if the
event originated as part of a web request:
| Name | Type | Description |
| ---- | ---- | ----------- |
| `user_id` | Number | The Canvas id of the currently logged in user. |
| `real_user_id` | Number | If the current user is being masqueraded, this is the Canvas id of the masquerading user. |
| `user_login` | String | The login of the current user. |
| `user_agent` | String | The User-Agent sent by the browser making the request. |
| `context_type` | String | The type of context where the event happened. |
| `context_id` | Number | The Canvas id of the current context. Always use the `context_type` when using this id to lookup the object. |
| `role` | String | The role of the current user in the current context. |
| `hostname` | String | The hostname of the current request |
| `request_id` | String | The identifier for this request. |
| `session_id` | String | The session identifier for this request. Can be used to correlate events in the same session for a user. |
For events originating as part of an asynchronous job, the following
fields may be set:
| Name | Type | Description |
| ---- | ---- | ----------- |
| `job_id` | Number | The identifier for the asynchronous job. |
| `job_tag` | String | A string identifying the type of job being performed. |
#### Body (data)
The `body` object will have key/value pairs with information specific to
each event, as described below.
Note: All Canvas ids are "global" identifiers.
### Supported Events
Note that the actual bodies of events may include more fields than
what's described in this document. Those fields are subject to change.
#### `syllabus_updated`
| Field | Description |
| ----- | ----------- |
| `course_id` | The Canvas id of the updated course. |
| `syllabus_body` | The new syllabus content (possibly truncated). |
| `old_syllabus_body` | The old syllabus content (possibly truncated). |
#### `discussion_entry_created`
| Field | Description |
| ----- | ----------- |
| `discussion_entry_id` | The Canvas id of the newly added entry. |
| `parent_discussion_entry_id` | If this was a reply, the Canvas id of the parent entry. |
| `parent_discussion_entry_author_id` | If this was a reply, the Canvas id of the parent entry author. |
| `discussion_topic_id` | The Canvas id of the topic the entry was added to. |
| `text` | The (possibly truncated) text of the post. |
#### `discussion_topic_created`
| Field | Description |
| ----- | ----------- |
| `discussion_topic_id` | The Canvas id of the new discussion topic. |
| `is_announcement` | `true` if this topic was posted as an announcement, `false` otherwise. |
| `title` | Title of the topic (possibly truncated). |
| `body` | Body of the topic (possibly truncated). |
#### `group_category_created`
| Field | Description |
| ----- | ----------- |
| `group_category_id` | The Canvas id of the newly created group category. |
| `group_category_name` | The name of the newly created group |
#### `group_created`
| Field | Description |
| ----- | ----------- |
| `group_id` | The Canvas id of the group the user is assigned to. |
| `group_name` | The name of the group the user is being assigned to. |
| `group_category_id` | The Canvas id of the group category. |
| `group_category_name` | The name of the group category. |
#### `group_membership_created`
Note: Only manual group assignments are currently sent. Groups where
people are automatically assigned to groups will not send `group_assign`
events.
| Field | Description |
| ----- | ----------- |
| `group_membership_id` | The Canvas id of the group membership. |
| `user_id` | The Canvas id of the user being assigned to a group. |
| `group_id` | The Canvas id of the group the user is assigned to. |
| `group_name` | The name of the group the user is being assigned to. |
| `group_category_id` | The Canvas id of the group category. |
| `group_category_name` | The name of the group category. |
#### `logged_in`
| Field | Description |
| ----- | ----------- |
| `redirect_url` | The URL the user was redirected to after logging in. Is set when the user logs in after clicking a deep link into Canvas. |
#### `logged_out`
No extra data.
#### `quiz_submitted`
| Field | Description |
| ----- | ----------- |
| `submission_id` | The Canvas id of the quiz submission. |
| `quiz_id` | The Canvas id of the quiz. |
#### `grade_changed`
`grade_change` events are posted every time a grade changes. These can
happen either as the result of a teacher changing a grade in the
gradebook or speedgrader, or with a quiz being automatically scored. In
the case of a quiz being scored, the `grade_change` event will be fired
as the result of a student turning in a quiz, and the `user_id` in the
message attributes will be of the student. In these cases, `grader_id`
should be null in the body.
| Field | Description |
| ----- | ----------- |
| `submission_id` | The Canvas id of the submission that the grade is changing on. |
| `grade` | The new grade. |
| `old_grade` | The previous grade, if there was one. |
| `grader_id` | The Canvas id of the user making the grade change. Null if this was the result of automatic grading. |
| `student_id` | The Canvas id of the student associated with the submission with the change. |
#### `wiki_page_created`
| Field | Description |
| ----- | ----------- |
| `wiki_page_id` | The Canvas id of the new wiki page. |
| `title` | The title of the new page (possibly truncated). |
| `body` | The body of the new page (possibly truncated). |
#### `wiki_page_updated`
| Field | Description |
| ----- | ----------- |
| `wiki_page_id` | The Canvas id of the changed wiki page. |
| `title` | The new title (possibly truncated). |
| `old_title` | The old title (possibly truncated). |
| `body` | The new page body (possibly truncated). |
| `old_body` | The old page body (possibly truncated). |
#### `wiki_page_deleted`
| Field | Description |
| ----- | ----------- |
| `wiki_page_id` | The Canvas id of the delete wiki page. |
| `title` | The title of the deleted wiki page (possibly truncated). |

37
doc/live_events.md Normal file
View File

@ -0,0 +1,37 @@
# Live Events
Canvas includes the ability to push a subset of real-time events to a
Kinesis stream, which can then be consumed for various analytics
purposes. This is not a full-fidelity feed of all changes to the
database, but a targetted set of interesting actions such as
`grade_changed`, `login`, etc.
## Development and Testing
To enabled Live Events, you need to configure the plugin in the /plugins
interface. If using the docker-compose dev setup, there is already a
"fake kinesis" running that you can use. Make sure you have the `aws`
cli installed, and run the following command to create a stream (with
canvas running):
```bash
AWS_ACCESS_KEY_ID=key AWS_SECRET_ACCESS_KEY=secret aws --endpoint-url http://kinesis.docker/ kinesis create-stream --stream-name=mystream --shard-count=1
```
Once the stream is created, configure your Canvas (at /plugins) to use
it:
| Setting Name | Value |
| --------------------- | ------------------- |
| Kinesis Stream Name | mystream |
| AWS Region | (leave blank) |
| AWS Endpoint | http://kinesis:4567 |
| AWS Access Key ID | key |
| AWS Secret Access Key | secret |
Restart Canvas, and events should start flowing to your kinesis stream.
You can view the stream with the `tail_kinesis` tool:
```bash
fig run --rm web script/tail_kinesis kinesis 4567 mystream
```

View File

@ -9,6 +9,17 @@ selenium:
ports:
- "5900:5900"
kinesis:
build: ./docker-compose/kinesalite
command: "--path /var/lib/kinesalite"
volumes:
- "tmp/cache:/var/lib/kinesalite"
ports:
- "4567:4567"
environment:
VIRTUAL_HOST: kinesis.docker
VIRTUAL_PORT: 4567
web:
build: ./docker-compose
command: bundle exec bin/rails s
@ -20,6 +31,7 @@ web:
- postgres
- redis
- selenium
- kinesis
environment:
RACK_ENV: development
VIRTUAL_HOST: canvas.docker
@ -32,6 +44,7 @@ jobs:
links:
- postgres
- redis
- kinesis
environment:
RACK_ENV: development

View File

@ -0,0 +1,5 @@
FROM dockerfile/nodejs:latest
RUN npm install kinesalite --global
EXPOSE 4567
ENTRYPOINT ["/usr/local/bin/kinesalite"]

7
gems/live_events/Gemfile Normal file
View File

@ -0,0 +1,7 @@
source 'https://rubygems.org'
gem 'aws-sdk'
gemspec
gem 'canvas_statsd', path: '../canvas_statsd'

View File

@ -0,0 +1 @@
require "bundler/gem_tasks"

View File

@ -0,0 +1,80 @@
#
# Copyright (C) 2014 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require 'canvas_statsd'
module LiveEvents
class << self
attr_accessor :logger, :cache, :statsd
def plugin_settings=(settings)
@plugin_settings = settings
end
def plugin_settings
@plugin_settings.call
end
def max_queue_size=(size)
@max_queue_size = size
end
def max_queue_size
@max_queue_size.call
end
require 'live_events/client'
require 'live_events/async_worker'
# Set (on the current thread) the context to be used for future calls to post_event.
def set_context(ctx)
Thread.current[:live_events_ctx] = ctx
end
def clear_context!
Thread.current[:live_events_ctx] = nil
end
# Post an event for the current account.
def post_event(event_name, payload, time = Time.now, ctx = nil, partition_key = nil)
if config = LiveEvents::Client.config
ctx ||= Thread.current[:live_events_ctx]
LiveEvents::Client.new(config).post_event(event_name, payload, time, ctx, partition_key)
end
end
def truncate(string)
if string
string.truncate(Setting.get('live_events_text_max_length', 8192).to_i, separator: ' ')
end
end
def worker
if !@launched_pid || @launched_pid != Process.pid || @worker.stopped?
if @launched_pid && @launched_pid != Process.pid
logger.warn "Starting new LiveEvents worker thread due to fork."
end
@worker = LiveEvents::AsyncWorker.new
@launched_pid = Process.pid
end
@worker
end
end
end

View File

@ -0,0 +1,78 @@
#
# Copyright (C) 2014 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require 'thread'
module LiveEvents
# TODO: Consider refactoring out common functionality from this and
# CanvasPandaPub::AsyncWorker. Their semantics are a bit different so
# it may not make sense.
# TODO: Consider adding batched requests. Kinesis has a put_records call
# that is more efficient. (Would also require using aws-sdk-v2 instead of v1.)
#
# If we do that, we'll want to add an at_exit handler that flushes out the
# queue for cases when the process is shutting down.
class AsyncWorker
def initialize(start_thread = true)
@queue = Queue.new
@logger = LiveEvents.logger
self.start! if start_thread
end
def push(p)
if @queue.length >= LiveEvents.max_queue_size
return false
end
@queue << p
true
end
def stopped?
@thread.nil? || !@thread.alive?
end
def stop!
@queue << :stop
@thread.join
end
def start!
@thread = Thread.new { self.run_thread }
end
def run_thread
loop do
p = @queue.pop
break if p == :stop
begin
p.call
rescue Exception => e
@logger.error("Exception making LiveEvents async call: #{e}")
end
end
end
end
end

View File

@ -0,0 +1,102 @@
#
# Copyright (C) 2014 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require 'aws-sdk'
require 'json'
require 'active_support'
require 'active_support/core_ext/object/blank'
module LiveEvents
class Client
def self.config
res = LiveEvents.plugin_settings.try(:settings)
return nil unless res && !res['kinesis_stream_name'].blank? &&
!res['aws_access_key_id'].blank? &&
!res['aws_secret_access_key_dec'].blank? &&
(!res['aws_region'].blank? || !res['aws_endpoint'].blank?)
res.dup
end
def initialize(config = nil)
config ||= LiveEvents::Client.config
@kinesis = AWS::Kinesis.new(Client.aws_config(config)).client
@stream_name = config['kinesis_stream_name']
end
def self.aws_config(plugin_config)
aws = {
:access_key_id => plugin_config['aws_access_key_id'],
:secret_access_key => plugin_config['aws_secret_access_key_dec'],
}
if !plugin_config['aws_region'].blank?
aws[:region] = plugin_config['aws_region']
end
if !plugin_config['aws_endpoint'].blank?
uri = URI.parse(plugin_config['aws_endpoint'])
aws[:kinesis_endpoint] = uri.host
aws[:kinesis_port] = uri.port
aws[:use_ssl] = (uri.scheme == "https")
end
aws
end
def post_event(event_name, payload, time = Time.now, ctx = {}, partition_key = nil)
statsd_prefix = "live_events.events.#{event_name}"
ctx ||= {}
attributes = ctx.merge({
event_name: event_name,
event_time: time.utc.iso8601
})
event = {
attributes: attributes,
body: payload
}
# We don't care too much about the partition key, but it seems safe to
# let it be the user_id when that's available.
partition_key ||= (ctx["user_id"] && ctx["user_id"].try(:to_s)) || rand(1000).to_s
event_json = JSON.dump(event)
job = Proc.new {
begin
@kinesis.put_record(stream_name: @stream_name,
data: event_json,
partition_key: partition_key)
LiveEvents.statsd.increment("#{statsd_prefix}.sends") if LiveEvents.statsd
rescue => e
LiveEvents.logger.error("Error posting event #{e} event: #{event_json}")
LiveEvents.statsd.increment("#{statsd_prefix}.send_errors") if LiveEvents.statsd
end
}
unless LiveEvents.worker.push(job)
LiveEvents.logger.error("Error queueing job for worker event: #{event_json}")
LiveEvents.statsd.increment("#{statsd_prefix}.queue_full_errors") if LiveEvents.statsd
end
end
end
end

View File

@ -0,0 +1,27 @@
# coding: utf-8
lib = File.expand_path('../lib', __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
Gem::Specification.new do |spec|
spec.name = "live_events"
spec.version = "1.0.0"
spec.authors = ["Zach Wily"]
spec.email = ["zach@instructure.com"]
spec.summary = %q{LiveEvents}
spec.files = Dir.glob("{lib,spec}/**/*") + %w(test.sh)
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]
spec.add_dependency "aws-sdk-v1"
spec.add_dependency "canvas_statsd"
spec.add_dependency "activesupport"
spec.add_development_dependency "bundler", "~> 1.5"
spec.add_development_dependency "rake"
spec.add_development_dependency "rspec", "2.99.0"
spec.add_development_dependency "mocha"
end

View File

View File

@ -0,0 +1,48 @@
#
# Copyright (C) 2014 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require 'spec_helper'
describe LiveEvents::AsyncWorker do
before(:each) do
LiveEvents.max_queue_size = -> { 100 }
LiveEvents.logger = mock()
@worker = LiveEvents::AsyncWorker.new(false)
end
describe "push" do
it "should execute stuff pushed on the queue" do
fired = false
@worker.push -> { fired = true }
@worker.start!
@worker.stop!
expect(fired).to be true
end
it "should reject items when queue is full" do
LiveEvents.max_queue_size = -> { 5 }
5.times { expect(@worker.push -> {}).to be_truthy }
expect(@worker.push -> {}).to be false
end
end
end

View File

@ -0,0 +1,155 @@
#
# Copyright (C) 2014 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require 'spec_helper'
require 'aws-sdk-v1'
describe LiveEvents::Client do
def stub_config(opts = {})
LiveEvents::Client.stubs(:config).returns({
'kinesis_stream_name' => 'stream',
'aws_access_key_id' => 'access_key',
'aws_secret_access_key_dec' => 'secret_key',
'aws_region' => 'us-east-1'
})
end
before(:each) do
stub_config
LiveEvents.logger = mock()
LiveEvents.max_queue_size = -> { 100 }
@kclient = mock()
kinesis = mock()
kinesis.stubs(:client).returns(@kclient)
AWS::Kinesis.stubs(:new).returns(kinesis)
@client = LiveEvents::Client.new
end
def expect_put_record(payload)
@kclient.expects(:put_record).with() { |params|
params = params.merge({ data: JSON.parse(params[:data]) })
params == payload
}
end
describe "config" do
it "should correctly parse the endpoint" do
res = LiveEvents::Client.aws_config({
"aws_endpoint" => "http://example.com:6543/"
})
res[:kinesis_endpoint].should eq("example.com")
res[:kinesis_port].should eq(6543)
res[:use_ssl].should eq(false)
end
end
describe "post_event" do
now = Time.now
it "should call put_record on the kinesis stream" do
expect_put_record({
stream_name: 'stream',
data: {
"attributes" => {
"event_name" => 'event',
"event_time" => now.utc.iso8601
},
"body" => {}
},
partition_key: "123"
})
@client.post_event('event', {}, now, {}, "123")
LiveEvents.worker.stop!
end
it "should include attributes when supplied via ctx" do
now = Time.now
expect_put_record({
stream_name: 'stream',
data: {
"attributes" => {
"event_name" => 'event',
"event_time" => now.utc.iso8601,
"user_id" => 123,
"real_user_id" => 321,
"login" => 'loginname',
"user_agent" => 'agent'
},
"body" => {}
},
partition_key: 'pkey'
})
@client.post_event('event', {}, now, { user_id: 123, real_user_id: 321, login: 'loginname', user_agent: 'agent' }, 'pkey')
LiveEvents.worker.stop!
end
end
describe "LiveEvents helper" do
it "should set context info via set_context and send it with events" do
LiveEvents.set_context({ user_id: 123 })
now = Time.now
expect_put_record({
stream_name: 'stream',
data: {
"attributes" => {
"event_name" => 'event',
"event_time" => now.utc.iso8601,
"user_id" => 123
},
"body" => {}
},
partition_key: 'pkey'
})
LiveEvents.post_event('event', {}, now, nil, 'pkey')
LiveEvents.worker.stop!
end
it "should clear context on clear_context!" do
LiveEvents.set_context({ user_id: 123 })
LiveEvents.clear_context!
now = Time.now
expect_put_record({
stream_name: 'stream',
data: {
"attributes" => {
"event_name" => 'event',
"event_time" => now.utc.iso8601
},
"body" => {}
},
partition_key: 'pkey'
})
LiveEvents.post_event('event', {}, now, nil, 'pkey')
LiveEvents.worker.stop!
end
end
end

View File

@ -0,0 +1,15 @@
require 'live_events'
require 'thread'
RSpec.configure do |config|
config.treat_symbols_as_metadata_keys_with_true_values = true
config.run_all_when_everything_filtered = true
config.filter_run :focus
config.color = true
config.order = 'random'
config.mock_framework = :mocha
end
Thread.abort_on_exception = true
require 'mocha'

15
gems/live_events/test.sh Executable file
View File

@ -0,0 +1,15 @@
#!/bin/bash
result=0
echo "################ live_events ################"
bundle check || bundle install
bundle exec rspec spec
let result=$result+$?
if [ $result -eq 0 ]; then
echo "SUCCESS"
else
echo "FAILURE"
fi
exit $result

127
lib/canvas/live_events.rb Normal file
View File

@ -0,0 +1,127 @@
module Canvas::LiveEvents
def self.course_syllabus_updated(course, old_syllabus_body)
LiveEvents.post_event('syllabus_updated', {
course_id: course.global_id,
syllabus_body: LiveEvents.truncate(course.syllabus_body),
old_syllabus_body: LiveEvents.truncate(old_syllabus_body)
})
end
def self.discussion_entry_created(entry)
payload = {
discussion_entry_id: entry.global_id,
discussion_topic_id: entry.global_discussion_topic_id,
text: LiveEvents.truncate(entry.message)
}
if entry.parent_id
payload.merge!({
parent_discussion_entry_id: entry.global_parent_id
})
end
LiveEvents.post_event('discussion_entry_created', payload)
end
def self.discussion_topic_created(topic)
LiveEvents.post_event('discussion_topic_created', {
discussion_topic_id: topic.global_id,
is_announcement: topic.is_announcement,
title: LiveEvents.truncate(topic.title),
body: LiveEvents.truncate(topic.message)
})
end
def self.group_membership_created(membership)
LiveEvents.post_event('group_membership_created', {
group_membership_id: membership.global_id,
user_id: membership.global_user_id,
group_id: membership.global_group_id,
group_name: membership.group.name,
group_category_id: membership.group.global_group_category_id,
group_category_name: membership.group.group_category.try(:name)
})
end
def self.group_category_created(group_category)
LiveEvents.post_event('group_category_created', {
group_category_id: group_category.global_id,
group_category_name: group_category.name
})
end
def self.group_created(group)
LiveEvents.post_event('group_created', {
group_category_id: group.global_group_category_id,
group_category_name: group.group_category.try(:name),
group_id: group.global_id,
group_name: group.name
})
end
def self.logged_in(session)
LiveEvents.post_event('logged_in', {
redirect_url: session[:return_to]
})
end
def self.logged_out
LiveEvents.post_event('logged_out', {})
end
def self.quiz_submitted(submission)
# TODO: include score, for automatically graded portions?
LiveEvents.post_event('quiz_submitted', {
submission_id: submission.global_id,
quiz_id: submission.global_quiz_id
})
end
def self.wiki_page_created(page)
LiveEvents.post_event('wiki_page_created', {
wiki_page_id: page.global_id,
title: LiveEvents.truncate(page.title),
body: LiveEvents.truncate(page.body)
})
end
def self.wiki_page_updated(page, old_title, old_body)
payload = {
wiki_page_id: page.global_id,
title: page.title,
body: page.body
}
if old_title
payload[:old_title] = old_title
end
if old_body
payload[:old_body] = old_body
end
LiveEvents.post_event('wiki_page_updated', payload)
end
def self.wiki_page_deleted(page)
LiveEvents.post_event('wiki_page_deleted', {
wiki_page_id: page.global_id,
title: LiveEvents.truncate(page.title)
})
end
def self.grade_changed(submission, old_grade)
grader_id = nil
if submission.grader_id && !submission.autograded?
grader_id = submission.global_grader_id
end
LiveEvents.post_event('grade_change', {
submission_id: submission.global_id,
grade: submission.grade,
old_grade: old_grade,
grader_id: grader_id,
student_id: submission.global_user_id
})
end
end

View File

@ -308,5 +308,21 @@ Canvas::Plugin.register('pandapub', nil, {
:settings_partial => 'plugins/panda_pub_settings',
:validator => 'PandaPubValidator'
})
Canvas::Plugins::TicketingSystem.register!
Canvas::Plugin.register('live_events', nil, {
:name => lambda{ t :name, 'Live Events' },
:description => lambda{ t :description, 'Service for real-time events.' },
:author => 'Instructure',
:author_website => 'http://www.instructure.com',
:version => '1.0.0',
:settings => {
:kinesis_stream_name => nil,
:aws_access_key_id => nil,
:aws_secret_access_key => nil,
:aws_region => 'us-east-1',
:aws_endpoint => nil,
},
:encrypted_settings => [ :aws_secret_access_key ],
:settings_partial => 'plugins/live_events_settings',
:validator => 'LiveEventsValidator'
})

View File

@ -0,0 +1,57 @@
#
# Copyright (C) 2013 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
module Canvas::Plugins::Validators::LiveEventsValidator
def self.validate(settings, plugin_setting)
if settings.map(&:last).all?(&:blank?)
{}
else
err = false
if settings[:kinesis_stream_name].blank?
plugin_setting.errors.add(:base, I18n.t('canvas.plugins.errors.stream_name_required', 'The kinesis stream name is required.'))
err = true
end
if settings[:aws_endpoint].blank? && settings[:aws_region].blank?
plugin_setting.errors.add(:base, I18n.t('canvas.plugins.errors.endpoint_or_region_required', 'The AWS region (or endpoint) is required.'))
err = true
end
if !settings[:aws_endpoint].blank?
uri = URI.parse(settings[:aws_endpoint].strip) rescue nil
if !uri
plugin_setting.errors.add(:base, I18n.t('canvas.plugins.errors.invalid_live_events_url', 'Invalid endpoint, must be a valid URL.'))
end
end
if settings[:aws_access_key_id].blank? || settings[:aws_secret_access_key].blank?
plugin_setting.errors.add(:base, I18n.t('canvas.plugins.errors.aws_creds_required', 'The AWS credentials are required.'))
err = true
end
if err
return false
else
return settings.slice(:kinesis_stream_name, :aws_access_key_id, :aws_secret_access_key, :aws_region, :aws_endpoint)
end
end
end
end

39
script/tail_kinesis Executable file
View File

@ -0,0 +1,39 @@
#!/usr/bin/env ruby
require File.expand_path('../../config/boot', __FILE__)
require 'aws-sdk-v1'
require 'json'
require 'pp'
HOSTNAME = ARGV[0]
PORT = ARGV[1]
STREAM_NAME = ARGV[2]
@kinesis = AWS::Kinesis.new(
access_key_id: 'key',
secret_access_key: 'secret',
kinesis_endpoint: HOSTNAME,
kinesis_port: PORT,
use_ssl: false
).client
res = @kinesis.get_shard_iterator(
stream_name: STREAM_NAME,
shard_id: "shardId-000000000000",
shard_iterator_type: "TRIM_HORIZON"
)
iterator = res.data[:shard_iterator]
loop do
res = @kinesis.get_records(shard_iterator: iterator)
res.data[:records].each do |record|
body = record[:data]
pp JSON.parse(body)
puts "---"
end
iterator = res.data[:next_shard_iterator]
sleep 1
end

View File

@ -21,7 +21,9 @@ require File.expand_path(File.dirname(__FILE__) + '/../sharding_spec_helper')
describe ApplicationController do
before :each do
controller.stubs(:request).returns(stub(:host_with_port => "www.example.com"))
controller.stubs(:request).returns(stub(:host_with_port => "www.example.com",
:host => "www.example.com",
:headers => {}))
end
describe "#twitter_connection" do
@ -438,6 +440,8 @@ describe ApplicationController do
acct.save!
controller.instance_variable_set(:@domain_root_account, acct)
req = mock()
req.stubs(:host).returns('www.example.com')
req.stubs(:headers).returns({})
controller.stubs(:request).returns(req)
controller.send(:assign_localizer)

View File

@ -0,0 +1,31 @@
#
# Copyright (C) 2011 - 2014 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require File.expand_path(File.dirname(__FILE__) + '/../sharding_spec_helper')
describe LiveEvents do
before :once do
user_with_pseudonym(:username => 'jtfrd@instructure.com', :active_all => 1, :password => 'qwerty')
end
it "should trigger a live event on login" do
Canvas::LiveEvents.expects(:logged_in).once
post '/login', { :pseudonym_session => { :unique_id => 'jtfrd@instructure.com', :password => 'qwerty' } }
expect(response).to be_redirect
end
end

View File

@ -78,6 +78,8 @@ describe Quizzes::QuizSubmissionsController do
end
def submit_quiz
Canvas::LiveEvents.expects(:quiz_submitted).with(@qs)
post "/courses/#{@course.id}/quizzes/#{@quiz.id}/submissions/",
:question_1 => 'password', :attempt => 1, :validation_token => @qs.validation_token
expect(response).to be_redirect

View File

@ -0,0 +1,103 @@
#
# Copyright (C) 2011 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require File.expand_path(File.dirname(__FILE__) + '/../../spec_helper.rb')
describe Canvas::LiveEvents do
# The only methods tested in here are ones that have any sort of logic happening.
describe ".wiki_page_updated" do
before(:each) do
course_with_teacher
@page = @course.wiki.wiki_pages.create(:title => "old title", :body => "old body")
end
it "should not set old_title or old_body if they don't change" do
@page.save
LiveEvents.expects(:post_event).with('wiki_page_updated', {
wiki_page_id: @page.global_id,
title: "old title",
body: "old body"
})
Canvas::LiveEvents.wiki_page_updated(@page)
end
it "should set old_title if the title changed" do
@page.title = "new title"
@page.save
LiveEvents.expects(:post_event).with('wiki_page_updated', {
wiki_page_id: @page.global_id,
title: "new title",
old_title: "old title",
body: "old body"
})
Canvas::LiveEvents.wiki_page_updated(@page)
end
it "should set old_body if the body changed" do
@page.body = "new body"
@page.save
LiveEvents.expects(:post_event).with('wiki_page_updated', {
wiki_page_id: @page.global_id,
title: "old title",
body: "new body",
old_body: "old body"
})
Canvas::LiveEvents.wiki_page_updated(@page)
end
end
describe ".grade_changed" do
it "should set the grader to nil for an autograded quiz" do
quiz_with_graded_submission([])
LiveEvents.expects(:post_event).with('grade_change', {
submission_id: @quiz_submission.submission.global_id,
grade: @quiz_submission.submission.grade,
old_grade: 0,
grader_id: nil,
student_id: @quiz_submission.user.global_id
})
Canvas::LiveEvents.grade_changed(@quiz_submission.submission, 0)
end
it "should set the grader when a teacher grades an assignment" do
course_with_student_submissions
submission = @course.assignments.first.submissions.first
LiveEvents.expects(:post_event).with('grade_change', {
submission_id: submission.global_id,
grade: 10,
old_grade: 0,
grader_id: @teacher.global_id,
student_id: @student.global_id
})
submission.grader = @teacher
submission.grade = 10
Canvas::LiveEvents.grade_changed(submission, 0)
end
end
end

View File

@ -1,6 +1,7 @@
require File.expand_path(File.dirname(__FILE__) + '/../spec_helper.rb')
require File.expand_path(File.dirname(__FILE__) + '/../sharding_spec_helper')
require 'db/migrate/20150312155754_remove_discussion_entry_attachment_foreign_key'
describe 'RemoveDiscussionEntryAttachmentForeignKey' do

View File

@ -0,0 +1,58 @@
#
# Copyright (C) 2015 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require File.expand_path(File.dirname(__FILE__) + '/../spec_helper')
describe LiveEventsObserver do
it "should post an event when a course syllabus changes" do
c = course
c.syllabus_body = "old syllabus"
c.save!
Canvas::LiveEvents.expects(:course_syllabus_updated).never
c.save!
c.syllabus_body = "new syllabus"
Canvas::LiveEvents.expects(:course_syllabus_updated).with(c, "old syllabus")
c.save
end
it "should post an event when a wiki page body or title changes" do
c = course
p = c.wiki.wiki_pages.create(:title => 'old title', :body => 'old body')
Canvas::LiveEvents.expects(:wiki_page_updated).never
p.touch
Canvas::LiveEvents.expects(:wiki_page_updated).with(p, 'old title', nil)
p.title = 'new title'
p.save
Canvas::LiveEvents.expects(:wiki_page_updated).with(p, nil, 'old body')
p.body = 'new body'
p.save
end
it "should post an event when a discussion topic is created" do
c = course
Canvas::LiveEvents.expects(:discussion_topic_created).once
c.discussion_topics.create!(:message => 'test')
end
end

View File

@ -824,6 +824,7 @@ RSpec.configure do |config|
# object_id should make it unique (but obviously things will fail if
# it tries to load it from the db.)
pseudonym.stubs(:id).returns(pseudonym.object_id)
pseudonym.stubs(:unique_id).returns('unique_id')
end
session = stub('PseudonymSession', :record => pseudonym, :session_credentials => nil)