mirror of https://github.com/rails/rails
Replaced `ActiveSupport::Concurrency::Latch` with concurrent-ruby.
The concurrent-ruby gem is a toolset containing many concurrency utilities. Many of these utilities include runtime-specific optimizations when possible. Rather than clutter the Rails codebase with concurrency utilities separate from the core task, such tools can be superseded by similar tools in the more specialized gem. This commit replaces `ActiveSupport::Concurrency::Latch` with `Concurrent::CountDownLatch`, which is functionally equivalent.
This commit is contained in:
parent
08e41a0432
commit
284a9ba8ec
|
@ -64,6 +64,7 @@ PATH
|
|||
actionpack (5.0.0.alpha)
|
||||
actionview (= 5.0.0.alpha)
|
||||
activesupport (= 5.0.0.alpha)
|
||||
concurrent-ruby (~> 0.9.0)
|
||||
rack (~> 1.6)
|
||||
rack-test (~> 0.6.3)
|
||||
rails-dom-testing (~> 1.0, >= 1.0.5)
|
||||
|
@ -85,6 +86,7 @@ PATH
|
|||
activesupport (= 5.0.0.alpha)
|
||||
arel (= 7.0.0.alpha)
|
||||
activesupport (5.0.0.alpha)
|
||||
concurrent-ruby (~> 0.9.0)
|
||||
i18n (~> 0.7)
|
||||
json (~> 1.7, >= 1.7.7)
|
||||
minitest (~> 5.1)
|
||||
|
@ -135,6 +137,7 @@ GEM
|
|||
execjs
|
||||
coffee-script-source (1.9.0)
|
||||
columnize (0.9.0)
|
||||
concurrent-ruby (0.9.0)
|
||||
connection_pool (2.1.1)
|
||||
dalli (2.7.2)
|
||||
dante (0.1.5)
|
||||
|
|
|
@ -1,3 +1,8 @@
|
|||
* Replaced `ActiveSupport::Concurrency::Latch` with `Concurrent::CountDownLatch`
|
||||
from the concurrent-ruby gem.
|
||||
|
||||
*Jerry D'Antonio*
|
||||
|
||||
* Add ability to filter parameters based on parent keys.
|
||||
|
||||
# matches {credit_card: {code: "xxxx"}}
|
||||
|
|
|
@ -26,6 +26,7 @@ Gem::Specification.new do |s|
|
|||
s.add_dependency 'rails-html-sanitizer', '~> 1.0', '>= 1.0.2'
|
||||
s.add_dependency 'rails-dom-testing', '~> 1.0', '>= 1.0.5'
|
||||
s.add_dependency 'actionview', version
|
||||
s.add_dependency 'concurrent-ruby', '~> 0.9.0'
|
||||
|
||||
s.add_development_dependency 'activemodel', version
|
||||
end
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
require 'abstract_unit'
|
||||
require 'active_support/concurrency/latch'
|
||||
require 'concurrent/atomics'
|
||||
Thread.abort_on_exception = true
|
||||
|
||||
module ActionController
|
||||
|
@ -145,7 +145,7 @@ module ActionController
|
|||
response.headers['Content-Type'] = 'text/event-stream'
|
||||
%w{ hello world }.each do |word|
|
||||
response.stream.write word
|
||||
latch.await
|
||||
latch.wait
|
||||
end
|
||||
response.stream.close
|
||||
end
|
||||
|
@ -212,7 +212,7 @@ module ActionController
|
|||
# .. plus one more, because the #each frees up a slot:
|
||||
response.stream.write '.'
|
||||
|
||||
latch.release
|
||||
latch.count_down
|
||||
|
||||
# This write will block, and eventually raise
|
||||
response.stream.write 'x'
|
||||
|
@ -233,7 +233,7 @@ module ActionController
|
|||
end
|
||||
|
||||
logger.info 'Work complete'
|
||||
latch.release
|
||||
latch.count_down
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -278,7 +278,7 @@ module ActionController
|
|||
def test_async_stream
|
||||
rubinius_skip "https://github.com/rubinius/rubinius/issues/2934"
|
||||
|
||||
@controller.latch = ActiveSupport::Concurrency::Latch.new
|
||||
@controller.latch = Concurrent::CountDownLatch.new
|
||||
parts = ['hello', 'world']
|
||||
|
||||
@controller.request = @request
|
||||
|
@ -289,8 +289,8 @@ module ActionController
|
|||
resp.stream.each do |part|
|
||||
assert_equal parts.shift, part
|
||||
ol = @controller.latch
|
||||
@controller.latch = ActiveSupport::Concurrency::Latch.new
|
||||
ol.release
|
||||
@controller.latch = Concurrent::CountDownLatch.new
|
||||
ol.count_down
|
||||
end
|
||||
}
|
||||
|
||||
|
@ -300,23 +300,23 @@ module ActionController
|
|||
end
|
||||
|
||||
def test_abort_with_full_buffer
|
||||
@controller.latch = ActiveSupport::Concurrency::Latch.new
|
||||
@controller.latch = Concurrent::CountDownLatch.new
|
||||
|
||||
@request.parameters[:format] = 'plain'
|
||||
@controller.request = @request
|
||||
@controller.response = @response
|
||||
|
||||
got_error = ActiveSupport::Concurrency::Latch.new
|
||||
got_error = Concurrent::CountDownLatch.new
|
||||
@response.stream.on_error do
|
||||
ActionController::Base.logger.warn 'Error while streaming'
|
||||
got_error.release
|
||||
got_error.count_down
|
||||
end
|
||||
|
||||
t = Thread.new(@response) { |resp|
|
||||
resp.await_commit
|
||||
_, _, body = resp.to_a
|
||||
body.each do |part|
|
||||
@controller.latch.await
|
||||
@controller.latch.wait
|
||||
body.close
|
||||
break
|
||||
end
|
||||
|
@ -325,13 +325,13 @@ module ActionController
|
|||
capture_log_output do |output|
|
||||
@controller.process :overfill_buffer_and_die
|
||||
t.join
|
||||
got_error.await
|
||||
got_error.wait
|
||||
assert_match 'Error while streaming', output.rewind && output.read
|
||||
end
|
||||
end
|
||||
|
||||
def test_ignore_client_disconnect
|
||||
@controller.latch = ActiveSupport::Concurrency::Latch.new
|
||||
@controller.latch = Concurrent::CountDownLatch.new
|
||||
|
||||
@controller.request = @request
|
||||
@controller.response = @response
|
||||
|
@ -349,7 +349,7 @@ module ActionController
|
|||
@controller.process :ignore_client_disconnect
|
||||
t.join
|
||||
Timeout.timeout(3) do
|
||||
@controller.latch.await
|
||||
@controller.latch.wait
|
||||
end
|
||||
assert_match 'Work complete', output.rewind && output.read
|
||||
end
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
require 'abstract_unit'
|
||||
require 'active_support/concurrency/latch'
|
||||
require 'concurrent/atomics'
|
||||
|
||||
module ActionController
|
||||
module Live
|
||||
|
@ -27,18 +27,18 @@ module ActionController
|
|||
end
|
||||
|
||||
def test_parallel
|
||||
latch = ActiveSupport::Concurrency::Latch.new
|
||||
latch = Concurrent::CountDownLatch.new
|
||||
|
||||
t = Thread.new {
|
||||
@response.stream.write 'foo'
|
||||
latch.await
|
||||
latch.wait
|
||||
@response.stream.close
|
||||
}
|
||||
|
||||
@response.await_commit
|
||||
@response.each do |part|
|
||||
assert_equal 'foo', part
|
||||
latch.release
|
||||
latch.count_down
|
||||
end
|
||||
assert t.join
|
||||
end
|
||||
|
@ -62,15 +62,15 @@ module ActionController
|
|||
|
||||
def test_headers_cannot_be_written_after_webserver_reads
|
||||
@response.stream.write 'omg'
|
||||
latch = ActiveSupport::Concurrency::Latch.new
|
||||
latch = Concurrent::CountDownLatch.new
|
||||
|
||||
t = Thread.new {
|
||||
@response.stream.each do |chunk|
|
||||
latch.release
|
||||
latch.count_down
|
||||
end
|
||||
}
|
||||
|
||||
latch.await
|
||||
latch.wait
|
||||
assert @response.headers.frozen?
|
||||
e = assert_raises(ActionDispatch::IllegalStateError) do
|
||||
@response.headers['Content-Length'] = "zomg"
|
||||
|
|
|
@ -1,3 +1,8 @@
|
|||
* Replaced `ActiveSupport::Concurrency::Latch` with `Concurrent::CountDownLatch`
|
||||
from the concurrent-ruby gem.
|
||||
|
||||
*Jerry D'Antonio*
|
||||
|
||||
* Fix through associations using scopes having the scope merged multiple
|
||||
times.
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
require "cases/helper"
|
||||
require 'active_support/concurrency/latch'
|
||||
require 'models/post'
|
||||
require 'models/author'
|
||||
require 'models/topic'
|
||||
|
@ -29,6 +28,7 @@ require 'models/bird'
|
|||
require 'models/car'
|
||||
require 'models/bulb'
|
||||
require 'rexml/document'
|
||||
require 'concurrent/atomics'
|
||||
|
||||
class FirstAbstractClass < ActiveRecord::Base
|
||||
self.abstract_class = true
|
||||
|
@ -1506,20 +1506,20 @@ class BasicsTest < ActiveRecord::TestCase
|
|||
orig_handler = klass.connection_handler
|
||||
new_handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
|
||||
after_handler = nil
|
||||
latch1 = ActiveSupport::Concurrency::Latch.new
|
||||
latch2 = ActiveSupport::Concurrency::Latch.new
|
||||
latch1 = Concurrent::CountDownLatch.new
|
||||
latch2 = Concurrent::CountDownLatch.new
|
||||
|
||||
t = Thread.new do
|
||||
klass.connection_handler = new_handler
|
||||
latch1.release
|
||||
latch2.await
|
||||
latch1.count_down
|
||||
latch2.wait
|
||||
after_handler = klass.connection_handler
|
||||
end
|
||||
|
||||
latch1.await
|
||||
latch1.wait
|
||||
|
||||
klass.connection_handler = orig_handler
|
||||
latch2.release
|
||||
latch2.count_down
|
||||
t.join
|
||||
|
||||
assert_equal after_handler, new_handler
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
require "cases/helper"
|
||||
require 'active_support/concurrency/latch'
|
||||
require 'concurrent/atomics'
|
||||
|
||||
module ActiveRecord
|
||||
module ConnectionAdapters
|
||||
|
@ -133,15 +133,15 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
def test_reap_inactive
|
||||
ready = ActiveSupport::Concurrency::Latch.new
|
||||
ready = Concurrent::CountDownLatch.new
|
||||
@pool.checkout
|
||||
child = Thread.new do
|
||||
@pool.checkout
|
||||
@pool.checkout
|
||||
ready.release
|
||||
ready.count_down
|
||||
Thread.stop
|
||||
end
|
||||
ready.await
|
||||
ready.wait
|
||||
|
||||
assert_equal 3, active_connections(@pool).size
|
||||
|
||||
|
@ -360,13 +360,13 @@ module ActiveRecord
|
|||
def test_concurrent_connection_establishment
|
||||
assert_operator @pool.connections.size, :<=, 1
|
||||
|
||||
all_threads_in_new_connection = ActiveSupport::Concurrency::Latch.new(@pool.size - @pool.connections.size)
|
||||
all_go = ActiveSupport::Concurrency::Latch.new
|
||||
all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size)
|
||||
all_go = Concurrent::CountDownLatch.new
|
||||
|
||||
@pool.singleton_class.class_eval do
|
||||
define_method(:new_connection) do
|
||||
all_threads_in_new_connection.release
|
||||
all_go.await
|
||||
all_threads_in_new_connection.count_down
|
||||
all_go.wait
|
||||
super()
|
||||
end
|
||||
end
|
||||
|
@ -381,14 +381,14 @@ module ActiveRecord
|
|||
# the kernel of the whole test is here, everything else is just scaffolding,
|
||||
# this latch will not be released unless conn. pool allows for concurrent
|
||||
# connection creation
|
||||
all_threads_in_new_connection.await
|
||||
all_threads_in_new_connection.wait
|
||||
end
|
||||
rescue Timeout::Error
|
||||
flunk 'pool unable to establish connections concurrently or implementation has ' <<
|
||||
'changed, this test then needs to patch a different :new_connection method'
|
||||
ensure
|
||||
# clean up the threads
|
||||
all_go.release
|
||||
all_go.count_down
|
||||
connecting_threads.map(&:join)
|
||||
end
|
||||
end
|
||||
|
@ -441,11 +441,11 @@ module ActiveRecord
|
|||
with_single_connection_pool do |pool|
|
||||
[:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method|
|
||||
conn = pool.connection # drain the only available connection
|
||||
second_thread_done = ActiveSupport::Concurrency::Latch.new
|
||||
second_thread_done = Concurrent::CountDownLatch.new
|
||||
|
||||
# create a first_thread and let it get into the FIFO queue first
|
||||
first_thread = Thread.new do
|
||||
pool.with_connection { second_thread_done.await }
|
||||
pool.with_connection { second_thread_done.wait }
|
||||
end
|
||||
|
||||
# wait for first_thread to get in queue
|
||||
|
@ -456,7 +456,7 @@ module ActiveRecord
|
|||
# first_thread when a connection is made available
|
||||
second_thread = Thread.new do
|
||||
pool.send(group_action_method)
|
||||
second_thread_done.release
|
||||
second_thread_done.count_down
|
||||
end
|
||||
|
||||
# wait for second_thread to get in queue
|
||||
|
@ -471,7 +471,7 @@ module ActiveRecord
|
|||
failed = true unless second_thread.join(2)
|
||||
|
||||
#--- post test clean up start
|
||||
second_thread_done.release if failed
|
||||
second_thread_done.count_down if failed
|
||||
|
||||
# after `pool.disconnect()` the first thread will be left stuck in queue, no need to wait for
|
||||
# it to timeout with ConnectionTimeoutError
|
||||
|
|
|
@ -1,3 +1,8 @@
|
|||
* Removed `ActiveSupport::Concurrency::Latch`, superseded by `Concurrent::CountDownLatch`
|
||||
from the concurrent-ruby gem.
|
||||
|
||||
*Jerry D'Antonio*
|
||||
|
||||
* Fix not calling `#default` on `HashWithIndifferentAccess#to_hash` when only
|
||||
`default_proc` is set, which could raise.
|
||||
|
||||
|
|
|
@ -25,4 +25,5 @@ Gem::Specification.new do |s|
|
|||
s.add_dependency 'tzinfo', '~> 1.1'
|
||||
s.add_dependency 'minitest', '~> 5.1'
|
||||
s.add_dependency 'thread_safe','~> 0.3', '>= 0.3.4'
|
||||
s.add_dependency 'concurrent-ruby', '~> 0.9.0'
|
||||
end
|
||||
|
|
|
@ -1,26 +1,18 @@
|
|||
require 'thread'
|
||||
require 'monitor'
|
||||
require 'concurrent/atomics'
|
||||
|
||||
module ActiveSupport
|
||||
module Concurrency
|
||||
class Latch
|
||||
def initialize(count = 1)
|
||||
@count = count
|
||||
@lock = Monitor.new
|
||||
@cv = @lock.new_cond
|
||||
end
|
||||
class Latch < Concurrent::CountDownLatch
|
||||
|
||||
def release
|
||||
@lock.synchronize do
|
||||
@count -= 1 if @count > 0
|
||||
@cv.broadcast if @count.zero?
|
||||
end
|
||||
def initialize(count = 1)
|
||||
ActiveSupport::Deprecation.warn("ActiveSupport::Concurrency::Latch is deprecated. Please use Concurrent::CountDownLatch instead.")
|
||||
super(count)
|
||||
end
|
||||
|
||||
alias_method :release, :count_down
|
||||
|
||||
def await
|
||||
@lock.synchronize do
|
||||
@cv.wait_while { @count > 0 }
|
||||
end
|
||||
wait(nil)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue