mirror of https://github.com/rails/rails
Switch the default redis adapter to a single-stream model
This new adapter does get a little more intimate with the redis-rb gem's implementation than I would like, but it's the least bad of the approaches I've come up with.
This commit is contained in:
parent
6162c49e40
commit
e77368637e
1
Gemfile
1
Gemfile
|
@ -65,6 +65,7 @@ group :cable do
|
|||
gem 'puma', require: false
|
||||
|
||||
gem 'em-hiredis', require: false
|
||||
gem 'hiredis', require: false
|
||||
gem 'redis', require: false
|
||||
|
||||
gem 'faye-websocket', require: false
|
||||
|
|
|
@ -286,6 +286,7 @@ DEPENDENCIES
|
|||
delayed_job_active_record
|
||||
em-hiredis
|
||||
faye-websocket
|
||||
hiredis
|
||||
jquery-rails
|
||||
json
|
||||
kindlerb (= 0.1.1)
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
require 'thread'
|
||||
|
||||
gem 'em-hiredis', '~> 0.3.0'
|
||||
gem 'redis', '~> 3.0'
|
||||
require 'em-hiredis'
|
||||
require 'redis'
|
||||
|
||||
EventMachine.epoll if EventMachine.epoll?
|
||||
EventMachine.kqueue if EventMachine.kqueue?
|
||||
|
||||
module ActionCable
|
||||
module SubscriptionAdapter
|
||||
class EventedRedis < Base # :nodoc:
|
||||
@@mutex = Mutex.new
|
||||
|
||||
def initialize(*)
|
||||
super
|
||||
@redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
|
||||
end
|
||||
|
||||
def broadcast(channel, payload)
|
||||
redis_connection_for_broadcasts.publish(channel, payload)
|
||||
end
|
||||
|
||||
def subscribe(channel, message_callback, success_callback = nil)
|
||||
redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
|
||||
result.callback { |reply| success_callback.call } if success_callback
|
||||
end
|
||||
end
|
||||
|
||||
def unsubscribe(channel, message_callback)
|
||||
redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
|
||||
end
|
||||
|
||||
def shutdown
|
||||
redis_connection_for_subscriptions.pubsub.close_connection
|
||||
@redis_connection_for_subscriptions = nil
|
||||
end
|
||||
|
||||
private
|
||||
def redis_connection_for_subscriptions
|
||||
ensure_reactor_running
|
||||
@redis_connection_for_subscriptions || @server.mutex.synchronize do
|
||||
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
|
||||
redis.on(:reconnect_failed) do
|
||||
@logger.info "[ActionCable] Redis reconnect failed."
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def redis_connection_for_broadcasts
|
||||
@redis_connection_for_broadcasts || @server.mutex.synchronize do
|
||||
@redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
|
||||
end
|
||||
end
|
||||
|
||||
def ensure_reactor_running
|
||||
return if EventMachine.reactor_running?
|
||||
@@mutex.synchronize do
|
||||
Thread.new { EventMachine.run } unless EventMachine.reactor_running?
|
||||
Thread.pass until EventMachine.reactor_running?
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,52 +1,40 @@
|
|||
require 'thread'
|
||||
|
||||
gem 'em-hiredis', '~> 0.3.0'
|
||||
gem 'redis', '~> 3.0'
|
||||
require 'em-hiredis'
|
||||
require 'redis'
|
||||
|
||||
EventMachine.epoll if EventMachine.epoll?
|
||||
EventMachine.kqueue if EventMachine.kqueue?
|
||||
|
||||
module ActionCable
|
||||
module SubscriptionAdapter
|
||||
class Redis < Base # :nodoc:
|
||||
@@mutex = Mutex.new
|
||||
|
||||
def initialize(*)
|
||||
super
|
||||
@redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
|
||||
@listener = nil
|
||||
@redis_connection_for_broadcasts = nil
|
||||
end
|
||||
|
||||
def broadcast(channel, payload)
|
||||
redis_connection_for_broadcasts.publish(channel, payload)
|
||||
end
|
||||
|
||||
def subscribe(channel, message_callback, success_callback = nil)
|
||||
redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
|
||||
result.callback { |reply| success_callback.call } if success_callback
|
||||
end
|
||||
def subscribe(channel, callback, success_callback = nil)
|
||||
listener.add_subscriber(channel, callback, success_callback)
|
||||
end
|
||||
|
||||
def unsubscribe(channel, message_callback)
|
||||
redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
|
||||
def unsubscribe(channel, callback)
|
||||
listener.remove_subscriber(channel, callback)
|
||||
end
|
||||
|
||||
def shutdown
|
||||
redis_connection_for_subscriptions.pubsub.close_connection
|
||||
@redis_connection_for_subscriptions = nil
|
||||
@listener.shutdown if @listener
|
||||
end
|
||||
|
||||
def redis_connection_for_subscriptions
|
||||
::Redis.new(@server.config.cable)
|
||||
end
|
||||
|
||||
private
|
||||
def redis_connection_for_subscriptions
|
||||
ensure_reactor_running
|
||||
@redis_connection_for_subscriptions || @server.mutex.synchronize do
|
||||
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
|
||||
redis.on(:reconnect_failed) do
|
||||
@logger.info "[ActionCable] Redis reconnect failed."
|
||||
end
|
||||
end
|
||||
end
|
||||
def listener
|
||||
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
|
||||
end
|
||||
|
||||
def redis_connection_for_broadcasts
|
||||
|
@ -55,12 +43,120 @@ module ActionCable
|
|||
end
|
||||
end
|
||||
|
||||
def ensure_reactor_running
|
||||
return if EventMachine.reactor_running?
|
||||
@@mutex.synchronize do
|
||||
Thread.new { EventMachine.run } unless EventMachine.reactor_running?
|
||||
Thread.pass until EventMachine.reactor_running?
|
||||
class Listener < SubscriberMap
|
||||
def initialize(adapter)
|
||||
super()
|
||||
|
||||
@adapter = adapter
|
||||
|
||||
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
|
||||
@subscription_lock = Mutex.new
|
||||
|
||||
@raw_client = nil
|
||||
|
||||
@when_connected = []
|
||||
|
||||
@thread = nil
|
||||
end
|
||||
|
||||
def listen(conn)
|
||||
conn.without_reconnect do
|
||||
original_client = conn.client
|
||||
|
||||
conn.subscribe('_action_cable_internal') do |on|
|
||||
on.subscribe do |chan, count|
|
||||
@subscription_lock.synchronize do
|
||||
if count == 1
|
||||
@raw_client = original_client
|
||||
|
||||
until @when_connected.empty?
|
||||
@when_connected.shift.call
|
||||
end
|
||||
end
|
||||
|
||||
if callbacks = @subscribe_callbacks[chan]
|
||||
next_callback = callbacks.shift
|
||||
Concurrent.global_io_executor << next_callback if next_callback
|
||||
@subscribe_callbacks.delete(chan) if callbacks.empty?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
on.message do |chan, message|
|
||||
broadcast(chan, message)
|
||||
end
|
||||
|
||||
on.unsubscribe do |chan, count|
|
||||
if count == 0
|
||||
@subscription_lock.synchronize do
|
||||
@raw_client = nil
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def shutdown
|
||||
@subscription_lock.synchronize do
|
||||
return if @thread.nil?
|
||||
|
||||
when_connected do
|
||||
send_command('unsubscribe')
|
||||
@raw_client = nil
|
||||
end
|
||||
end
|
||||
|
||||
Thread.pass while @thread.alive?
|
||||
end
|
||||
|
||||
def add_channel(channel, on_success)
|
||||
@subscription_lock.synchronize do
|
||||
ensure_listener_running
|
||||
@subscribe_callbacks[channel] << on_success
|
||||
when_connected { send_command('subscribe', channel) }
|
||||
end
|
||||
end
|
||||
|
||||
def remove_channel(channel)
|
||||
@subscription_lock.synchronize do
|
||||
when_connected { send_command('unsubscribe', channel) }
|
||||
end
|
||||
end
|
||||
|
||||
def invoke_callback(*)
|
||||
Concurrent.global_io_executor.post { super }
|
||||
end
|
||||
|
||||
private
|
||||
def ensure_listener_running
|
||||
@thread ||= Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
|
||||
conn = @adapter.redis_connection_for_subscriptions
|
||||
listen conn
|
||||
end
|
||||
end
|
||||
|
||||
def when_connected(&block)
|
||||
if @raw_client
|
||||
block.call
|
||||
else
|
||||
@when_connected << block
|
||||
end
|
||||
end
|
||||
|
||||
def send_command(*command)
|
||||
@raw_client.write(command)
|
||||
|
||||
very_raw_connection =
|
||||
@raw_client.connection.instance_variable_defined?(:@connection) &&
|
||||
@raw_client.connection.instance_variable_get(:@connection)
|
||||
|
||||
if very_raw_connection && very_raw_connection.respond_to?(:flush)
|
||||
very_raw_connection.flush
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
require 'test_helper'
|
||||
require_relative './common'
|
||||
|
||||
class EventedRedisAdapterTest < ActionCable::TestCase
|
||||
include CommonSubscriptionAdapterTest
|
||||
|
||||
def cable_config
|
||||
{ adapter: 'evented_redis', url: 'redis://127.0.0.1:6379/12' }
|
||||
end
|
||||
end
|
|
@ -5,6 +5,12 @@ class RedisAdapterTest < ActionCable::TestCase
|
|||
include CommonSubscriptionAdapterTest
|
||||
|
||||
def cable_config
|
||||
{ adapter: 'redis', url: 'redis://127.0.0.1:6379/12' }
|
||||
{ adapter: 'redis', driver: 'ruby', url: 'redis://127.0.0.1:6379/12' }
|
||||
end
|
||||
end
|
||||
|
||||
class RedisAdapterTest::Hiredis < RedisAdapterTest
|
||||
def cable_config
|
||||
super.merge(driver: 'hiredis')
|
||||
end
|
||||
end
|
||||
|
|
|
@ -344,7 +344,6 @@ module Rails
|
|||
return [] if options[:skip_action_cable]
|
||||
comment = 'Action Cable dependencies for the Redis adapter'
|
||||
gems = []
|
||||
gems << GemfileEntry.new("em-hiredis", '~> 0.3.0', comment)
|
||||
gems << GemfileEntry.new("redis", '~> 3.0', comment)
|
||||
gems
|
||||
end
|
||||
|
|
|
@ -400,7 +400,6 @@ class AppGeneratorTest < Rails::Generators::TestCase
|
|||
assert_no_match(/action_cable_meta_tag/, content)
|
||||
end
|
||||
assert_file "Gemfile" do |content|
|
||||
assert_no_match(/em-hiredis/, content)
|
||||
assert_no_match(/redis/, content)
|
||||
end
|
||||
end
|
||||
|
@ -412,14 +411,12 @@ class AppGeneratorTest < Rails::Generators::TestCase
|
|||
assert_no_file "app/assets/javascripts/cable.coffee"
|
||||
assert_no_file "app/channels"
|
||||
assert_file "Gemfile" do |content|
|
||||
assert_no_match(/em-hiredis/, content)
|
||||
assert_no_match(/redis/, content)
|
||||
end
|
||||
end
|
||||
|
||||
def test_action_cable_redis_gems
|
||||
run_generator
|
||||
assert_gem 'em-hiredis'
|
||||
assert_gem 'redis'
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in New Issue