Move async execution from celluloid to concurrent-ruby

This removes 8 runtime gem dependencies from Rails:

```
Using hitimes 1.2.3
Using timers 4.1.1
Using celluloid-essentials 0.20.5
Using celluloid-extras 0.20.5
Using celluloid-fsm 0.20.5
Using celluloid-pool 0.20.5
Using celluloid-supervision 0.20.5
Using celluloid 0.17.2
```
This commit is contained in:
Mike Perham 2016-01-05 14:31:16 -08:00
parent b7b508aa79
commit 547713b4c9
9 changed files with 49 additions and 31 deletions

View File

@ -31,8 +31,8 @@ PATH
specs: specs:
actioncable (5.0.0.beta1) actioncable (5.0.0.beta1)
actionpack (= 5.0.0.beta1) actionpack (= 5.0.0.beta1)
celluloid (~> 0.17.2)
coffee-rails (~> 4.1.0) coffee-rails (~> 4.1.0)
concurrent-ruby (~> 1.0.0)
em-hiredis (~> 0.3.0) em-hiredis (~> 0.3.0)
faye-websocket (~> 0.10.0) faye-websocket (~> 0.10.0)
redis (~> 3.0) redis (~> 3.0)

View File

@ -23,7 +23,7 @@ Gem::Specification.new do |s|
s.add_dependency 'coffee-rails', '~> 4.1.0' s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1' s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_dependency 'celluloid', '~> 0.17.2' s.add_dependency 'concurrent-ruby', '~> 1.0.0'
s.add_dependency 'em-hiredis', '~> 0.3.0' s.add_dependency 'em-hiredis', '~> 0.3.0'
s.add_dependency 'redis', '~> 3.0' s.add_dependency 'redis', '~> 3.0'

View File

@ -28,7 +28,7 @@ module ActionCable
def start_periodic_timers def start_periodic_timers
self.class.periodic_timers.each do |callback, options| self.class.periodic_timers.each do |callback, options|
active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
connection.worker_pool.async.run_periodic_timer(self, callback) connection.worker_pool.async_run_periodic_timer(self, callback)
end end
end end
end end

View File

@ -103,7 +103,7 @@ module ActionCable
# Invoke a method on the connection asynchronously through the pool of thread workers. # Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments) def send_async(method, *arguments)
worker_pool.async.invoke(self, method, *arguments) worker_pool.async_invoke(self, method, *arguments)
end end
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.

View File

@ -1,10 +1,7 @@
require 'action_cable/server' require 'action_cable/server'
require 'eventmachine' require 'eventmachine'
require 'celluloid'
EM.error_handler do |e| EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}" puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n") puts e.backtrace.join("\n")
end end
Celluloid.logger = ActionCable.server.logger

View File

@ -1,6 +1,3 @@
# FIXME: Cargo culted fix from https://github.com/celluloid/celluloid-pool/issues/10
require 'celluloid/current'
require 'em-hiredis' require 'em-hiredis'
module ActionCable module ActionCable

View File

@ -1,19 +1,36 @@
require 'celluloid'
require 'active_support/callbacks' require 'active_support/callbacks'
require 'concurrent'
module ActionCable module ActionCable
module Server module Server
# Worker used by Server.send_async to do connection work in threads. Only for internal use. # Worker used by Server.send_async to do connection work in threads. Only for internal use.
class Worker class Worker
include ActiveSupport::Callbacks include ActiveSupport::Callbacks
include Celluloid
attr_reader :connection
define_callbacks :work define_callbacks :work
include ActiveRecordConnectionManagement include ActiveRecordConnectionManagement
def initialize(max_size=5)
@pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
def connection
Thread.current[:connection] || raise("No connection set")
end
def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
end
end
def invoke(receiver, method, *args) def invoke(receiver, method, *args)
@connection = receiver begin
Thread.current[:connection] = receiver
run_callbacks :work do run_callbacks :work do
receiver.send method, *args receiver.send method, *args
@ -23,17 +40,31 @@ module ActionCable
logger.error e.backtrace.join("\n") logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception) receiver.handle_exception if receiver.respond_to?(:handle_exception)
ensure
Thread.current[:connection] = nil
end
end
def async_run_periodic_timer(channel, callback)
@pool.post do
run_periodic_timer(channel, callback)
end
end end
def run_periodic_timer(channel, callback) def run_periodic_timer(channel, callback)
@connection = channel.connection begin
Thread.current[:connection] = channel.connection
run_callbacks :work do run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end end
ensure
Thread.current[:connection] = nil
end
end end
private private
def logger def logger
ActionCable.server.logger ActionCable.server.logger
end end

View File

@ -14,11 +14,6 @@ require 'rack/mock'
# Require all the stubs and models # Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
$CELLULOID_DEBUG = false
$CELLULOID_TEST = false
require 'celluloid'
Celluloid.logger = Logger.new(StringIO.new)
require 'faye/websocket' require 'faye/websocket'
class << Faye::WebSocket class << Faye::WebSocket
remove_method :ensure_reactor_running remove_method :ensure_reactor_running

View File

@ -17,8 +17,6 @@ class WorkerTest < ActiveSupport::TestCase
end end
setup do setup do
Celluloid.boot
@worker = ActionCable::Server::Worker.new @worker = ActionCable::Server::Worker.new
@receiver = Receiver.new @receiver = Receiver.new
end end