mirror of https://github.com/rails/rails
Merge pull request #24224 from danielrhodes/actioncable-websocket-protocols
ActionCable protocol negotiation
This commit is contained in:
commit
fa3537506a
|
@ -1,3 +1,23 @@
|
|||
* WebSocket protocol negotiation.
|
||||
|
||||
Introduces an Action Cable protocol version that moves independently
|
||||
of and, hopefully, more slowly than Action Cable itself. Client sockets
|
||||
negotiate a protocol with the Cable server using WebSockets' native
|
||||
subprotocol support:
|
||||
* https://tools.ietf.org/html/rfc6455#section-1.9
|
||||
* https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers#Subprotocols
|
||||
|
||||
If they can't negotiate a compatible protocol (usually due to upgrading
|
||||
the Cable server with a browser still running old JavaScript) then the
|
||||
client knows to disconnect, cease retrying, and tell the app that it hit
|
||||
a protocol mismatch.
|
||||
|
||||
This allows us to evolve the Action Cable message format, handshaking,
|
||||
pings, acknowledgements, and more without breaking older clients'
|
||||
expectations of server behavior.
|
||||
|
||||
*Daniel Rhodes*
|
||||
|
||||
* Pubsub: automatic stream decoding.
|
||||
|
||||
stream_for @room, coder: ActiveSupport::JSON do |message|
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
|
||||
# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation.
|
||||
|
||||
{message_types} = ActionCable.INTERNAL
|
||||
{message_types, protocols} = ActionCable.INTERNAL
|
||||
[supportedProtocols..., unsupportedProtocol] = protocols
|
||||
|
||||
class ActionCable.Connection
|
||||
@reopenDelay: 500
|
||||
|
@ -10,6 +11,7 @@ class ActionCable.Connection
|
|||
constructor: (@consumer) ->
|
||||
{@subscriptions} = @consumer
|
||||
@monitor = new ActionCable.ConnectionMonitor this
|
||||
@disconnected = true
|
||||
|
||||
send: (data) ->
|
||||
if @isOpen()
|
||||
|
@ -23,15 +25,16 @@ class ActionCable.Connection
|
|||
ActionCable.log("Attempted to open WebSocket, but existing socket is #{@getState()}")
|
||||
throw new Error("Existing connection must be closed before opening")
|
||||
else
|
||||
ActionCable.log("Opening WebSocket, current state is #{@getState()}")
|
||||
ActionCable.log("Opening WebSocket, current state is #{@getState()}, subprotocols: #{protocols}")
|
||||
@uninstallEventHandlers() if @webSocket?
|
||||
@webSocket = new WebSocket(@consumer.url)
|
||||
@webSocket = new WebSocket(@consumer.url, protocols)
|
||||
@installEventHandlers()
|
||||
@monitor.start()
|
||||
true
|
||||
|
||||
close: ->
|
||||
@webSocket?.close()
|
||||
close: ({allowReconnect} = {allowReconnect: true}) ->
|
||||
@monitor.stop() unless allowReconnect
|
||||
@webSocket?.close() if @isActive()
|
||||
|
||||
reopen: ->
|
||||
ActionCable.log("Reopening WebSocket, current state is #{@getState()}")
|
||||
|
@ -46,6 +49,9 @@ class ActionCable.Connection
|
|||
else
|
||||
@open()
|
||||
|
||||
getProtocol: ->
|
||||
@webSocket?.protocol
|
||||
|
||||
isOpen: ->
|
||||
@isState("open")
|
||||
|
||||
|
@ -54,6 +60,9 @@ class ActionCable.Connection
|
|||
|
||||
# Private
|
||||
|
||||
isProtocolSupported: ->
|
||||
@getProtocol() in supportedProtocols
|
||||
|
||||
isState: (states...) ->
|
||||
@getState() in states
|
||||
|
||||
|
@ -74,10 +83,12 @@ class ActionCable.Connection
|
|||
|
||||
events:
|
||||
message: (event) ->
|
||||
return unless @isSupportedProtocol()
|
||||
{identifier, message, type} = JSON.parse(event.data)
|
||||
switch type
|
||||
when message_types.welcome
|
||||
@monitor.recordConnect()
|
||||
@subscriptions.reload()
|
||||
when message_types.ping
|
||||
@monitor.recordPing()
|
||||
when message_types.confirmation
|
||||
|
@ -88,20 +99,18 @@ class ActionCable.Connection
|
|||
@subscriptions.notify(identifier, "received", message)
|
||||
|
||||
open: ->
|
||||
ActionCable.log("WebSocket onopen event")
|
||||
ActionCable.log("WebSocket onopen event, using '#{@getProtocol()}' subprotocol")
|
||||
@disconnected = false
|
||||
@subscriptions.reload()
|
||||
if not @isProtocolSupported()
|
||||
ActionCable.log("Protocol is unsupported. Stopping monitor and disconnecting.")
|
||||
@close(allowReconnect: false)
|
||||
|
||||
close: ->
|
||||
close: (event) ->
|
||||
ActionCable.log("WebSocket onclose event")
|
||||
@disconnect()
|
||||
return if @disconnected
|
||||
@disconnected = true
|
||||
@monitor.recordDisconnect()
|
||||
@subscriptions.notifyAll("disconnected", {willAttemptReconnect: @monitor.isRunning()})
|
||||
|
||||
error: ->
|
||||
ActionCable.log("WebSocket onerror event")
|
||||
@disconnect()
|
||||
|
||||
disconnect: ->
|
||||
return if @disconnected
|
||||
@disconnected = true
|
||||
@subscriptions.notifyAll("disconnected")
|
||||
@monitor.recordDisconnect()
|
||||
|
|
|
@ -14,6 +14,19 @@
|
|||
# App.appearance = App.cable.subscriptions.create "AppearanceChannel"
|
||||
#
|
||||
# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription.
|
||||
#
|
||||
# When a consumer is created, it automatically connects with the server.
|
||||
#
|
||||
# To disconnect from the server, call
|
||||
#
|
||||
# App.cable.disconnect()
|
||||
#
|
||||
# and to restart the connection:
|
||||
#
|
||||
# App.cable.connect()
|
||||
#
|
||||
# Any channel subscriptions which existed prior to disconnecting will
|
||||
# automatically resubscribe.
|
||||
class ActionCable.Consumer
|
||||
constructor: (@url) ->
|
||||
@subscriptions = new ActionCable.Subscriptions this
|
||||
|
@ -22,6 +35,12 @@ class ActionCable.Consumer
|
|||
send: (data) ->
|
||||
@connection.send(data)
|
||||
|
||||
connect: ->
|
||||
@connection.open()
|
||||
|
||||
disconnect: ->
|
||||
@connection.close(allowReconnect: false)
|
||||
|
||||
ensureActiveConnection: ->
|
||||
unless @connection.isActive()
|
||||
@connection.open()
|
||||
|
|
|
@ -8,6 +8,12 @@
|
|||
# connected: ->
|
||||
# # Called once the subscription has been successfully completed
|
||||
#
|
||||
# disconnected: ({ willAttemptReconnect: boolean }) ->
|
||||
# # Called when the client has disconnected with the server.
|
||||
# # The object will have an `willAttemptReconnect` property which
|
||||
# # says whether the client has the intention of attempting
|
||||
# # to reconnect.
|
||||
#
|
||||
# appear: ->
|
||||
# @perform 'appear', appearing_on: @appearingOn()
|
||||
#
|
||||
|
|
|
@ -35,7 +35,8 @@ module ActionCable
|
|||
confirmation: 'confirm_subscription'.freeze,
|
||||
rejection: 'reject_subscription'.freeze
|
||||
},
|
||||
default_mount_path: '/cable'.freeze
|
||||
default_mount_path: '/cable'.freeze,
|
||||
protocols: ["actioncable-v1-json".freeze, "actioncable-unsupported".freeze].freeze
|
||||
}
|
||||
|
||||
# Singleton instance of the server
|
||||
|
|
|
@ -48,7 +48,7 @@ module ActionCable
|
|||
include InternalChannel
|
||||
include Authorization
|
||||
|
||||
attr_reader :server, :env, :subscriptions, :logger, :worker_pool
|
||||
attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
|
||||
delegate :event_loop, :pubsub, to: :server
|
||||
|
||||
def initialize(server, env, coder: ActiveSupport::JSON)
|
||||
|
@ -163,6 +163,7 @@ module ActionCable
|
|||
end
|
||||
|
||||
def handle_open
|
||||
@protocol = websocket.protocol
|
||||
connect if respond_to?(:connect)
|
||||
subscribe_to_internal_channel
|
||||
send_welcome_message
|
||||
|
|
|
@ -29,7 +29,7 @@ module ActionCable
|
|||
|
||||
attr_reader :env, :url
|
||||
|
||||
def initialize(env, event_target, event_loop)
|
||||
def initialize(env, event_target, event_loop, protocols)
|
||||
@env = env
|
||||
@event_target = event_target
|
||||
@event_loop = event_loop
|
||||
|
@ -42,7 +42,7 @@ module ActionCable
|
|||
@ready_state = CONNECTING
|
||||
|
||||
# The driver calls +env+, +url+, and +write+
|
||||
@driver = ::WebSocket::Driver.rack(self)
|
||||
@driver = ::WebSocket::Driver.rack(self, protocols: protocols)
|
||||
|
||||
@driver.on(:open) { |e| open }
|
||||
@driver.on(:message) { |e| receive_message(e.data) }
|
||||
|
@ -111,6 +111,10 @@ module ActionCable
|
|||
@ready_state == OPEN
|
||||
end
|
||||
|
||||
def protocol
|
||||
@driver.protocol
|
||||
end
|
||||
|
||||
private
|
||||
def open
|
||||
return unless @ready_state == CONNECTING
|
||||
|
|
|
@ -3,9 +3,10 @@ require 'faye/websocket'
|
|||
module ActionCable
|
||||
module Connection
|
||||
class FayeClientSocket
|
||||
def initialize(env, event_target, stream_event_loop)
|
||||
def initialize(env, event_target, stream_event_loop, protocols)
|
||||
@env = env
|
||||
@event_target = event_target
|
||||
@protocols = protocols
|
||||
|
||||
@faye = nil
|
||||
end
|
||||
|
@ -23,6 +24,10 @@ module ActionCable
|
|||
@faye && @faye.close
|
||||
end
|
||||
|
||||
def protocol
|
||||
@faye && @faye.protocol
|
||||
end
|
||||
|
||||
def rack_response
|
||||
connect
|
||||
@faye.rack_response
|
||||
|
@ -31,7 +36,7 @@ module ActionCable
|
|||
private
|
||||
def connect
|
||||
return if @faye
|
||||
@faye = Faye::WebSocket.new(@env)
|
||||
@faye = Faye::WebSocket.new(@env, @protocols)
|
||||
|
||||
@faye.on(:open) { |event| @event_target.on_open }
|
||||
@faye.on(:message) { |event| @event_target.on_message(event.data) }
|
||||
|
|
|
@ -4,8 +4,8 @@ module ActionCable
|
|||
module Connection
|
||||
# Wrap the real socket to minimize the externally-presented API
|
||||
class WebSocket
|
||||
def initialize(env, event_target, event_loop, client_socket_class)
|
||||
@websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil
|
||||
def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols])
|
||||
@websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil
|
||||
end
|
||||
|
||||
def possible?
|
||||
|
@ -24,6 +24,10 @@ module ActionCable
|
|||
websocket.close
|
||||
end
|
||||
|
||||
def protocol
|
||||
websocket.protocol
|
||||
end
|
||||
|
||||
def rack_response
|
||||
websocket.rack_response
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue