Notifications: extract central Notifier, cordon off the internal Fanout implementation, and segregate instrumentation concerns

This commit is contained in:
Jeremy Kemper 2009-11-28 12:49:07 -08:00
parent 02893d1705
commit 4f2a04cc08
6 changed files with 292 additions and 339 deletions

View File

@ -632,13 +632,15 @@ class FragmentCachingTest < ActionController::TestCase
def test_fragment_for_logging
fragment_computed = false
ActiveSupport::Notifications.queue.expects(:publish).times(2)
events = []
ActiveSupport::Notifications.subscribe { |*args| events << args }
buffer = 'generated till now -> '
@controller.fragment_for(buffer, 'expensive') { fragment_computed = true }
assert fragment_computed
assert_equal 'generated till now -> ', buffer
assert_equal [:fragment_exist?, :write_fragment], events.map(&:first)
end
end

View File

@ -1,7 +1,4 @@
require 'thread'
require 'active_support/core_ext/module/delegation'
require 'active_support/core_ext/module/attribute_accessors'
require 'active_support/secure_random'
module ActiveSupport
# Notifications provides an instrumentation API for Ruby. To instrument an
@ -41,155 +38,42 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
mattr_accessor :queue
autoload :Instrumenter, 'active_support/notifications/instrumenter'
autoload :Event, 'active_support/notifications/instrumenter'
autoload :Fanout, 'active_support/notifications/fanout'
class << self
delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
attr_writer :notifier
delegate :publish, :subscribe, :instrument, :to => :notifier
def instrumenter
Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher)
end
def publisher
@publisher ||= Publisher.new(queue)
end
def subscribe(pattern=nil, &block)
Subscriber.new(queue).bind(pattern).subscribe(&block)
def notifier
@notifier ||= Notifier.new
end
end
class Instrumenter
def initialize(publisher)
@publisher = publisher
@id = random_id
end
def transaction
@id, old_id = random_id, @id
yield
ensure
@id = old_id
end
def transaction_id
@id
end
def instrument(name, payload={})
time = Time.now
result = yield if block_given?
ensure
@publisher.publish(name, time, Time.now, result, @id, payload)
end
private
def random_id
SecureRandom.hex(10)
end
end
class Publisher
def initialize(queue)
class Notifier
def initialize(queue = Fanout.new)
@queue = queue
end
def publish(*args)
@queue.publish(*args)
end
end
class Subscriber
def initialize(queue)
@queue = queue
end
def bind(pattern)
@pattern = pattern
self
end
def subscribe
@queue.subscribe(@pattern) do |*args|
yield(*args)
end
end
end
class Event
attr_reader :name, :time, :end, :transaction_id, :result, :payload
def initialize(name, start, ending, result, transaction_id, payload)
@name = name
@payload = payload.dup
@time = start
@transaction_id = transaction_id
@end = ending
@result = result
end
def duration
@duration ||= 1000.0 * (@end - @time)
end
def parent_of?(event)
start = (self.time - event.time) * 1000
start <= 0 && (start + duration >= event.duration)
end
end
# This is a default queue implementation that ships with Notifications. It
# consumes events in a thread and publish them to all registered subscribers.
#
class LittleFanout
def initialize
@listeners = []
end
def publish(*args)
@listeners.each { |l| l.publish(*args) }
end
def subscribe(pattern=nil, &block)
@listeners << Listener.new(pattern, &block)
def subscribe(pattern = nil, &block)
@queue.bind(pattern).subscribe(&block)
end
def wait
sleep 0.05 until drained?
@queue.wait
end
delegate :instrument, :to => :current_instrumenter
private
def drained?
@listeners.all? &:drained?
def current_instrumenter
Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self)
end
# Used for internal implementation only.
class Listener #:nodoc:
def initialize(pattern, &block)
@pattern = pattern
@subscriber = block
@queue = Queue.new
Thread.new { consume }
end
def publish(name, *args)
if !@pattern || @pattern === name.to_s
@queue << args.unshift(name)
end
end
def consume
while args = @queue.shift
@subscriber.call(*args)
end
end
def drained?
@queue.size.zero?
end
end
end
end
Notifications.queue = Notifications::LittleFanout.new
end

View File

@ -0,0 +1,84 @@
require 'thread'
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications. It
# consumes events in a thread and publish them to all registered subscribers.
#
class Fanout
def initialize
@subscribers = []
end
def bind(pattern)
Binding.new(self, pattern)
end
def subscribe(pattern = nil, &block)
@subscribers << Subscriber.new(pattern, &block)
end
def publish(*args)
@subscribers.each { |s| s.publish(*args) }
end
def wait
sleep(0.05) until @subscribers.all?(&:drained?)
end
# Used for internal implementation only.
class Binding #:nodoc:
def initialize(queue, pattern)
@queue, @pattern = queue, pattern
end
def subscribe(&block)
@queue.subscribe(@pattern, &block)
end
end
# Used for internal implementation only.
class Subscriber #:nodoc:
def initialize(pattern, &block)
@pattern =
case pattern
when Regexp, NilClass
pattern
else
/^#{Regexp.escape(pattern.to_s)}/
end
@block = block
@events = Queue.new
start_consumer
end
def publish(name, *args)
push(name, args) if matches?(name)
end
def consume
while args = @events.shift
@block.call(*args)
end
end
def drained?
@events.size.zero?
end
private
def start_consumer
Thread.new { consume }
end
def matches?(name)
!@pattern || @pattern =~ name.to_s
end
def push(name, args)
@events << args.unshift(name)
end
end
end
end
end

View File

@ -0,0 +1,47 @@
require 'active_support/secure_random'
require 'active_support/core_ext/module/delegation'
module ActiveSupport
module Notifications
class Instrumenter
def initialize(notifier)
@id = unique_id
@notifier = notifier
end
def instrument(name, payload={})
time = Time.now
result = yield if block_given?
ensure
@notifier.publish(name, time, Time.now, result, @id, payload)
end
private
def unique_id
SecureRandom.hex(10)
end
end
class Event
attr_reader :name, :time, :end, :transaction_id, :result, :payload
def initialize(name, start, ending, result, transaction_id, payload)
@name = name
@payload = payload.dup
@time = start
@transaction_id = transaction_id
@end = ending
@result = result
end
def duration
@duration ||= 1000.0 * (@end - @time)
end
def parent_of?(event)
start = (self.time - event.time) * 1000
start <= 0 && (start + duration >= event.duration)
end
end
end
end

View File

@ -1,206 +1,160 @@
require 'abstract_unit'
# Allow LittleFanout to be cleaned.
class ActiveSupport::Notifications::LittleFanout
def clear
@listeners.clear
end
end
module Notifications
class TestCase < ActiveSupport::TestCase
def setup
Thread.abort_on_exception = true
class NotificationsEventTest < Test::Unit::TestCase
def test_events_are_initialized_with_details
event = event(:foo, Time.now, Time.now + 1, 1, random_id, :payload => :bar)
assert_equal :foo, event.name
assert_equal Hash[:payload => :bar], event.payload
end
def test_events_consumes_information_given_as_payload
time = Time.now
event = event(:foo, time, time + 0.01, 1, random_id, {})
assert_equal Hash.new, event.payload
assert_equal time, event.time
assert_equal 1, event.result
assert_equal 10.0, event.duration
end
def test_event_is_parent_based_on_time_frame
time = Time.utc(2009, 01, 01, 0, 0, 1)
parent = event(:foo, Time.utc(2009), Time.utc(2009) + 100, nil, random_id, {})
child = event(:foo, time, time + 10, nil, random_id, {})
not_child = event(:foo, time, time + 100, nil, random_id, {})
assert parent.parent_of?(child)
assert !child.parent_of?(parent)
assert !parent.parent_of?(not_child)
assert !not_child.parent_of?(parent)
end
protected
def random_id
@random_id ||= ActiveSupport::SecureRandom.hex(10)
end
def event(*args)
ActiveSupport::Notifications::Event.new(*args)
end
end
class NotificationsMainTest < Test::Unit::TestCase
def setup
@events = []
Thread.abort_on_exception = true
ActiveSupport::Notifications.subscribe do |*args|
@events << ActiveSupport::Notifications::Event.new(*args)
end
end
def teardown
Thread.abort_on_exception = false
ActiveSupport::Notifications.queue.clear
end
def test_notifications_returns_action_result
result = ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do
1 + 1
@notifier = ActiveSupport::Notifications::Notifier.new
@events = []
@notifier.subscribe { |*args| @events << event(*args) }
end
assert_equal 2, result
end
def test_events_are_published_to_a_listener
ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do
1 + 1
def teardown
Thread.abort_on_exception = false
end
drain
private
def event(*args)
ActiveSupport::Notifications::Event.new(*args)
end
assert_equal 1, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "notifications"], @events.last.payload
def drain
@notifier.wait
end
end
def test_nested_events_can_be_instrumented
ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do
ActiveSupport::Notifications.instrument(:wot, :payload => "child") do
1 + 1
class PubSubTest < TestCase
def test_events_are_published_to_a_listener
@notifier.publish :foo
@notifier.wait
assert_equal [[:foo]], @events
end
def test_subscriber_with_pattern
events = []
@notifier.subscribe('1') { |*args| events << args }
@notifier.publish '1'
@notifier.publish '1.a'
@notifier.publish 'a.1'
@notifier.wait
assert_equal [['1'], ['1.a']], events
end
def test_subscriber_with_pattern_as_regexp
events = []
@notifier.subscribe(/\d/) { |*args| events << args }
@notifier.publish '1'
@notifier.publish 'a.1'
@notifier.publish '1.a'
@notifier.wait
assert_equal [['1'], ['a.1'], ['1.a']], events
end
def test_multiple_subscribers
@another = []
@notifier.subscribe { |*args| @another << args }
@notifier.publish :foo
@notifier.wait
assert_equal [[:foo]], @events
assert_equal [[:foo]], @another
end
private
def event(*args)
args
end
end
class InstrumentationTest < TestCase
def test_instrument_returns_block_result
assert_equal 2, @notifier.instrument(:awesome) { 1 + 1 }
end
def test_nested_events_can_be_instrumented
@notifier.instrument(:awesome, :payload => "notifications") do
@notifier.instrument(:wot, :payload => "child") do
1 + 1
end
drain
assert_equal 1, @events.size
assert_equal :wot, @events.first.name
assert_equal Hash[:payload => "child"], @events.first.payload
end
drain
assert_equal 2, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "notifications"], @events.last.payload
end
def test_instrument_publishes_when_exception_is_raised
begin
@notifier.instrument(:awesome, :payload => "notifications") do
raise "OMG"
end
flunk
rescue
end
drain
assert_equal 1, @events.size
assert_equal :wot, @events.first.name
assert_equal Hash[:payload => "child"], @events.first.payload
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "notifications"], @events.last.payload
end
drain
def test_event_is_pushed_even_without_block
@notifier.instrument(:awesome, :payload => "notifications")
drain
assert_equal 2, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "notifications"], @events.last.payload
assert_equal 1, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "notifications"], @events.last.payload
end
end
def test_event_is_pushed_even_if_block_fails
ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do
raise "OMG"
end rescue RuntimeError
class EventTest < TestCase
def test_events_are_initialized_with_details
event = event(:foo, Time.now, Time.now + 1, 1, random_id, :payload => :bar)
assert_equal :foo, event.name
assert_equal Hash[:payload => :bar], event.payload
end
drain
def test_events_consumes_information_given_as_payload
time = Time.now
event = event(:foo, time, time + 0.01, 1, random_id, {})
assert_equal 1, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "notifications"], @events.last.payload
assert_equal Hash.new, event.payload
assert_equal time, event.time
assert_equal 1, event.result
assert_equal 10.0, event.duration
end
def test_event_is_parent_based_on_time_frame
time = Time.utc(2009, 01, 01, 0, 0, 1)
parent = event(:foo, Time.utc(2009), Time.utc(2009) + 100, nil, random_id, {})
child = event(:foo, time, time + 10, nil, random_id, {})
not_child = event(:foo, time, time + 100, nil, random_id, {})
assert parent.parent_of?(child)
assert !child.parent_of?(parent)
assert !parent.parent_of?(not_child)
assert !not_child.parent_of?(parent)
end
protected
def random_id
@random_id ||= ActiveSupport::SecureRandom.hex(10)
end
end
def test_event_is_pushed_even_without_block
ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications")
drain
assert_equal 1, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "notifications"], @events.last.payload
end
def test_subscribed_in_a_transaction
@another = []
ActiveSupport::Notifications.subscribe("cache") do |*args|
@another << ActiveSupport::Notifications::Event.new(*args)
end
ActiveSupport::Notifications.instrument(:cache){ 1 }
ActiveSupport::Notifications.transaction do
ActiveSupport::Notifications.instrument(:cache){ 1 }
end
ActiveSupport::Notifications.instrument(:cache){ 1 }
drain
assert_equal 3, @another.size
before, during, after = @another.map {|e| e.transaction_id }
assert_equal before, after
assert_not_equal before, during
end
def test_subscriber_with_pattern
@another = []
ActiveSupport::Notifications.subscribe("cache") do |*args|
@another << ActiveSupport::Notifications::Event.new(*args)
end
ActiveSupport::Notifications.instrument(:cache){ 1 }
drain
assert_equal 1, @another.size
assert_equal :cache, @another.first.name
assert_equal 1, @another.first.result
end
def test_subscriber_with_pattern_as_regexp
@another = []
ActiveSupport::Notifications.subscribe(/cache/) do |*args|
@another << ActiveSupport::Notifications::Event.new(*args)
end
ActiveSupport::Notifications.instrument(:something){ 0 }
ActiveSupport::Notifications.instrument(:cache){ 1 }
drain
assert_equal 1, @another.size
assert_equal :cache, @another.first.name
assert_equal 1, @another.first.result
end
def test_with_several_consumers_and_several_events
@another = []
ActiveSupport::Notifications.subscribe do |*args|
@another << ActiveSupport::Notifications::Event.new(*args)
end
1.upto(100) do |i|
ActiveSupport::Notifications.instrument(:value){ i }
end
drain
assert_equal 100, @events.size
assert_equal :value, @events.first.name
assert_equal 1, @events.first.result
assert_equal 100, @events.last.result
assert_equal 100, @another.size
assert_equal :value, @another.first.name
assert_equal 1, @another.first.result
assert_equal 100, @another.last.result
end
private
def drain
ActiveSupport::Notifications.queue.wait
end
end

View File

@ -5,21 +5,8 @@ module ApplicationTests
include ActiveSupport::Testing::Isolation
class MyQueue
attr_reader :events, :subscribers
def initialize
@events = []
@subscribers = []
@listeners = []
end
def publish(name, *args)
@events << name
end
def subscribe(listener, pattern=nil, &block)
@listeners << listener
@subscribers << pattern
raise name
end
end
@ -28,21 +15,16 @@ module ApplicationTests
boot_rails
require "rails"
require "active_support/notifications"
@events = []
Rails::Initializer.run do |c|
c.notifications.queue = MyQueue.new
c.notifications.subscribe(/listening/) do
puts "Cool"
end
c.notifications.notifier = ActiveSupport::Notifications::Notifier.new(MyQueue.new)
end
end
test "new queue is set" do
ActiveSupport::Notifications.instrument(:foo)
assert_equal :foo, ActiveSupport::Notifications.queue.events.first
end
test "configuration subscribers are loaded" do
assert_equal 1, ActiveSupport::Notifications.queue.subscribers.count { |s| s == /listening/ }
assert_raise RuntimeError do
ActiveSupport::Notifications.publish('foo')
end
end
end
end