mirror of https://github.com/rails/rails
Allow subscription adapters to be shut down
This commit is contained in:
parent
b17a7e4c4d
commit
7363ad43f5
|
@ -19,6 +19,10 @@ module ActionCable
|
|||
def unsubscribe(channel, message_callback)
|
||||
raise NotImplementedError
|
||||
end
|
||||
|
||||
def shutdown
|
||||
raise NotImplementedError
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -13,6 +13,10 @@ module ActionCable
|
|||
subscriber_map.remove_subscriber(channel, callback)
|
||||
end
|
||||
|
||||
def shutdown
|
||||
# nothing to do
|
||||
end
|
||||
|
||||
private
|
||||
def subscriber_map
|
||||
@subscriber_map ||= SubscriberMap.new
|
||||
|
|
|
@ -19,6 +19,10 @@ module ActionCable
|
|||
listener.remove_subscriber(channel, callback)
|
||||
end
|
||||
|
||||
def shutdown
|
||||
listener.shutdown
|
||||
end
|
||||
|
||||
def with_connection(&block) # :nodoc:
|
||||
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
|
||||
pg_conn = ar_conn.raw_connection
|
||||
|
@ -43,7 +47,7 @@ module ActionCable
|
|||
@adapter = adapter
|
||||
@queue = Queue.new
|
||||
|
||||
Thread.new do
|
||||
@thread = Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
listen
|
||||
end
|
||||
|
@ -51,26 +55,35 @@ module ActionCable
|
|||
|
||||
def listen
|
||||
@adapter.with_connection do |pg_conn|
|
||||
loop do
|
||||
until @queue.empty?
|
||||
action, channel, callback = @queue.pop(true)
|
||||
escaped_channel = pg_conn.escape_identifier(channel)
|
||||
catch :shutdown do
|
||||
loop do
|
||||
until @queue.empty?
|
||||
action, channel, callback = @queue.pop(true)
|
||||
|
||||
if action == :listen
|
||||
pg_conn.exec("LISTEN #{escaped_channel}")
|
||||
::EM.next_tick(&callback) if callback
|
||||
elsif action == :unlisten
|
||||
pg_conn.exec("UNLISTEN #{escaped_channel}")
|
||||
case action
|
||||
when :listen
|
||||
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
|
||||
::EM.next_tick(&callback) if callback
|
||||
when :unlisten
|
||||
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
|
||||
when :shutdown
|
||||
throw :shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
pg_conn.wait_for_notify(1) do |chan, pid, message|
|
||||
broadcast(chan, message)
|
||||
pg_conn.wait_for_notify(1) do |chan, pid, message|
|
||||
broadcast(chan, message)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def shutdown
|
||||
@queue.push([:shutdown])
|
||||
Thread.pass while @thread.alive?
|
||||
end
|
||||
|
||||
def add_channel(channel, on_success)
|
||||
@queue.push([:listen, channel, on_success])
|
||||
end
|
||||
|
|
|
@ -20,6 +20,11 @@ module ActionCable
|
|||
redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
|
||||
end
|
||||
|
||||
def shutdown
|
||||
redis_connection_for_subscriptions.pubsub.close_connection
|
||||
@redis_connection_for_subscriptions = nil
|
||||
end
|
||||
|
||||
private
|
||||
def redis_connection_for_subscriptions
|
||||
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
|
||||
|
|
Loading…
Reference in New Issue