Implement an optimized Cache::Entry coder

Active Support's cache have for long been limited because
of its format. It directly serialize its `Entry` object with
`Marshal`, so any internal change might break the format.

The current shortcommings are:

  - The minimum entry overhead is quite ridiculous:
    `Marshal.dump(ActiveSupport::Cache::Entry.new("")).bytesize # => 107`
  - Only the internal `value` is compressed, but unless it's a String, to do so
    it first need to be serialized. So we end up with `Marshal.dump(Zlib.deflate(Marshal.dump(value)))`
    which is wasteful.
This commit is contained in:
Jean Boussier 2020-10-19 20:32:38 +02:00
parent 79744bc335
commit 3b71f3eb68
12 changed files with 311 additions and 42 deletions

View File

@ -93,6 +93,14 @@ module ActiveSupport
self.test_parallelization_disabled = true unless ENV["PARALLEL_WORKERS"]
end
def self.cache_format_version
Cache.format_version
end
def self.cache_format_version=(value)
Cache.format_version = value
end
def self.to_time_preserves_timezone
DateAndTime::Compatibility.preserve_timezone
end

View File

@ -35,7 +35,11 @@ module ActiveSupport
autoload :LocalCache, "active_support/cache/strategy/local_cache"
end
@format_version = 6.1
class << self
attr_accessor :format_version
# Creates a new Store object according to the given options.
#
# If no arguments are passed to this method, then a new
@ -171,24 +175,6 @@ module ActiveSupport
# threshold is configurable with the <tt>:compress_threshold</tt> option,
# specified in bytes.
class Store
module MarshalCoder # :nodoc:
extend self
def dump(entry)
Marshal.dump(entry)
end
def dump_compressed(entry, threshold)
Marshal.dump(entry.compressed(threshold))
end
def load(payload)
Marshal.load(payload)
end
end
DEFAULT_CODER = MarshalCoder
cattr_accessor :logger, instance_writer: true
attr_reader :silence, :options
@ -219,7 +205,7 @@ module ActiveSupport
@options[:compress] = true unless @options.key?(:compress)
@options[:compress_threshold] = DEFAULT_COMPRESS_LIMIT unless @options.key?(:compress_threshold)
@coder = @options.delete(:coder) { self.class::DEFAULT_CODER } || NullCoder
@coder = @options.delete(:coder) { default_coder } || NullCoder
@coder_supports_compression = @coder.respond_to?(:dump_compressed)
end
@ -600,6 +586,10 @@ module ActiveSupport
end
private
def default_coder
Coders[Cache.format_version]
end
# Adds the namespace defined in the options to a pattern designed to
# match keys. Implementations that support delete_matched should call
# this method to translate a pattern that matches names into one that
@ -705,6 +695,7 @@ module ActiveSupport
options[canonical_name] ||= options[alias_key] if alias_key
options.except!(*aliases)
end
options
end
@ -831,6 +822,76 @@ module ActiveSupport
end
end
module Coders # :nodoc:
MARK_61 = "\x04\b".b.freeze # The one set by Marshal.
MARK_70_UNCOMPRESSED = "\x00".b.freeze
MARK_70_COMPRESSED = "\x01".b.freeze
class << self
def [](version)
case version
when 6.1
Rails61Coder
when 7.0
Rails70Coder
else
raise ArgumentError, "Unknown ActiveSupport::Cache.format_version #{Cache.format_version.inspect}"
end
end
end
module Loader
extend self
def load(payload)
if payload.start_with?(MARK_70_UNCOMPRESSED)
members = Marshal.load(payload.byteslice(1..-1))
elsif payload.start_with?(MARK_70_COMPRESSED)
members = Marshal.load(Zlib::Inflate.inflate(payload.byteslice(1..-1)))
elsif payload.start_with?(MARK_61)
return Marshal.load(payload)
else
raise ArgumentError, %{Invalid cache prefix: #{payload.byteslice(0).inspect}, expected "\\x00" or "\\x01"}
end
Entry.unpack(members)
end
end
module Rails61Coder
include Loader
extend self
def dump(entry)
Marshal.dump(entry)
end
def dump_compressed(entry, threshold)
Marshal.dump(entry.compressed(threshold))
end
end
module Rails70Coder
include Loader
extend self
def dump(entry)
MARK_70_UNCOMPRESSED + Marshal.dump(entry.pack)
end
def dump_compressed(entry, threshold)
payload = Marshal.dump(entry.pack)
if payload.bytesize >= threshold
compressed_payload = Zlib::Deflate.deflate(payload)
if compressed_payload.bytesize < payload.bytesize
return MARK_70_COMPRESSED + compressed_payload
end
end
MARK_70_UNCOMPRESSED + payload
end
end
end
# This class is used to represent cache entries. Cache entries have a value, an optional
# expiration time, and an optional version. The expiration time is used to support the :race_condition_ttl option
# on the cache. The version is used to support the :version option on the cache for rejecting
@ -839,6 +900,12 @@ module ActiveSupport
# Since cache entries in most instances will be serialized, the internals of this class are highly optimized
# using short instance variable names that are lazily defined.
class Entry # :nodoc:
class << self
def unpack(members)
new(members[0], expires_at: members[1], version: members[2])
end
end
attr_reader :version
# Creates a new cache entry for the specified value. Options supported are
@ -848,7 +915,6 @@ module ActiveSupport
@version = version
@created_at = 0.0
@expires_in = expires_at&.to_f || expires_in && (expires_in.to_f + Time.now.to_f)
@compressed = true if compressed
end
@ -935,6 +1001,12 @@ module ActiveSupport
end
end
def pack
members = [value, expires_at, version]
members.pop while !members.empty? && members.last.nil?
members
end
private
def uncompress(value)
Marshal.load(Zlib::Inflate.inflate(value))

View File

@ -25,8 +25,6 @@ module ActiveSupport
# MemCacheStore implements the Strategy::LocalCache strategy which implements
# an in-memory cache inside of a block.
class MemCacheStore < Store
DEFAULT_CODER = NullCoder # Dalli automatically Marshal values
# Provide support for raw values in the local cache strategy.
module LocalCacheWithRaw # :nodoc:
private
@ -140,9 +138,59 @@ module ActiveSupport
end
private
module Coders # :nodoc:
class << self
def [](version)
case version
when 6.1
Rails61Coder
when 7.0
Rails70Coder
else
raise ArgumentError, "Unknown ActiveSupport::Cache.format_version #{Cache.format_version.inspect}"
end
end
end
module Loader
def load(payload)
if payload.is_a?(Entry)
payload
else
Cache::Coders::Loader.load(payload)
end
end
end
module Rails61Coder
include Loader
extend self
def dump(entry)
entry
end
def dump_compressed(entry, threshold)
entry.compressed(threshold)
end
end
module Rails70Coder
include Cache::Coders::Rails70Coder
include Loader
extend self
end
end
def default_coder
Coders[Cache.format_version]
end
# Read an entry from the cache.
def read_entry(key, **options)
rescue_error_with(nil) { deserialize_entry(@data.with { |c| c.get(key, options) }) }
rescue_error_with(nil) do
deserialize_entry(@data.with { |c| c.get(key, options) }, raw: options[:raw])
end
end
# Write an entry to the cache.
@ -168,7 +216,7 @@ module ActiveSupport
values = {}
raw_values.each do |key, value|
entry = deserialize_entry(value)
entry = deserialize_entry(value, raw: options[:raw])
unless entry.expired? || entry.mismatched?(normalize_version(keys_to_names[key], options))
values[keys_to_names[key]] = entry.value
@ -196,10 +244,12 @@ module ActiveSupport
key
end
def deserialize_entry(payload)
entry = super
entry = Entry.new(entry) if entry && !entry.is_a?(Entry)
entry
def deserialize_entry(payload, raw:)
if payload && raw
Entry.new(payload)
else
super(payload)
end
end
def rescue_error_with(fallback)

View File

@ -45,8 +45,6 @@ module ActiveSupport
end
end
DEFAULT_CODER = DupCoder
def initialize(options = nil)
options ||= {}
# Disable compression by default.
@ -145,6 +143,10 @@ module ActiveSupport
private
PER_ENTRY_OVERHEAD = 240
def default_coder
DupCoder
end
def cached_size(key, payload)
key.to_s.bytesize + payload.bytesize + PER_ENTRY_OVERHEAD
end

View File

@ -168,7 +168,7 @@ module ActiveSupport
# Race condition TTL is not set by default. This can be used to avoid
# "thundering herd" cache writes when hot cache entries are expired.
# See <tt>ActiveSupport::Cache::Store#fetch</tt> for more.
def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, coder: DEFAULT_CODER, expires_in: nil, race_condition_ttl: nil, error_handler: DEFAULT_ERROR_HANDLER, **redis_options)
def initialize(namespace: nil, compress: true, compress_threshold: 1.kilobyte, coder: default_coder, expires_in: nil, race_condition_ttl: nil, error_handler: DEFAULT_ERROR_HANDLER, **redis_options)
@redis_options = redis_options
@max_key_bytesize = MAX_KEY_BYTESIZE
@ -433,7 +433,10 @@ module ActiveSupport
if entries.any?
if mset_capable? && expires_in.nil?
failsafe :write_multi_entries do
redis.with { |c| c.mapped_mset(serialize_entries(entries, **options)) }
payload = serialize_entries(entries, **options)
redis.with do |c|
c.mapped_mset(payload)
end
end
else
super

View File

@ -183,7 +183,7 @@ module CacheStoreBehavior
end
def test_nil_with_compress_low_compress_threshold
assert_uncompressed(nil, compress: true, compress_threshold: 1)
assert_uncompressed(nil, compress: true, compress_threshold: 20)
end
def test_small_string_with_default_compression_settings
@ -243,12 +243,12 @@ module CacheStoreBehavior
end
def test_incompressible_data
assert_uncompressed(nil, compress: true, compress_threshold: 1)
assert_uncompressed(true, compress: true, compress_threshold: 1)
assert_uncompressed(false, compress: true, compress_threshold: 1)
assert_uncompressed(0, compress: true, compress_threshold: 1)
assert_uncompressed(1.2345, compress: true, compress_threshold: 1)
assert_uncompressed("", compress: true, compress_threshold: 1)
assert_uncompressed(nil, compress: true, compress_threshold: 30)
assert_uncompressed(true, compress: true, compress_threshold: 30)
assert_uncompressed(false, compress: true, compress_threshold: 30)
assert_uncompressed(0, compress: true, compress_threshold: 30)
assert_uncompressed(1.2345, compress: true, compress_threshold: 30)
assert_uncompressed("", compress: true, compress_threshold: 30)
incompressible = nil
@ -600,8 +600,11 @@ module CacheStoreBehavior
actual_entry = @cache.send(:read_entry, @cache.send(:normalize_key, "actual", {}), **{})
uncompressed_entry = @cache.send(:read_entry, @cache.send(:normalize_key, "uncompressed", {}), **{})
actual_size = Marshal.dump(actual_entry).bytesize
uncompressed_size = Marshal.dump(uncompressed_entry).bytesize
actual_payload = @cache.send(:serialize_entry, actual_entry, **@cache.send(:merged_options, options))
uncompressed_payload = @cache.send(:serialize_entry, uncompressed_entry, compress: false)
actual_size = actual_payload.bytesize
uncompressed_size = uncompressed_payload.bytesize
if should_compress
assert_operator actual_size, :<, uncompressed_size, "value should be compressed"

28
activesupport/test/cache/coder_test.rb vendored Normal file
View File

@ -0,0 +1,28 @@
# frozen_string_literal: true
require_relative "../abstract_unit"
require "active_support/cache"
class CacheCoderTest < ActiveSupport::TestCase
def test_new_coder_can_read_legacy_payloads
entry = ActiveSupport::Cache::Entry.new("foobar", expires_in: 1.hour, version: "v42")
deserialized_entry = ActiveSupport::Cache::Coders::Rails70Coder.load(
ActiveSupport::Cache::Coders::Rails61Coder.dump(entry),
)
assert_equal entry.value, deserialized_entry.value
assert_equal entry.version, deserialized_entry.version
assert_equal entry.expires_at, deserialized_entry.expires_at
end
def test_legacy_coder_can_read_new_payloads
entry = ActiveSupport::Cache::Entry.new("foobar", expires_in: 1.hour, version: "v42")
deserialized_entry = ActiveSupport::Cache::Coders::Rails61Coder.load(
ActiveSupport::Cache::Coders::Rails70Coder.dump(entry),
)
assert_equal entry.value, deserialized_entry.value
assert_equal entry.version, deserialized_entry.version
assert_equal entry.expires_at, deserialized_entry.expires_at
end
end

View File

@ -144,3 +144,36 @@ class FileStoreTest < ActiveSupport::TestCase
assert_equal false, @cache.write(1, "aaaaaaaaaa", unless_exist: true)
end
end
class OptimizedFileStoreTest < FileStoreTest
def setup
@previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 7.0
super
end
def forward_compatibility
previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 6.1
@old_store = lookup_store
ActiveSupport::Cache.format_version = previous_format
@old_store.write("foo", "bar")
assert_equal "bar", @cache.read("foo")
end
def forward_compatibility
previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 6.1
@old_store = lookup_store
ActiveSupport::Cache.format_version = previous_format
@cache.write("foo", "bar")
assert_equal "bar", @old_store.read("foo")
end
def teardown
super
ActiveSupport::Cache.format_version = @previous_format
end
end

View File

@ -11,6 +11,7 @@ class SlowDalliClient < Dalli::Client
def get(key, options = {})
if /latency/.match?(key)
sleep 3
super
else
super
end
@ -181,7 +182,7 @@ class MemCacheStoreTest < ActiveSupport::TestCase
def test_unless_exist_expires_when_configured
cache = ActiveSupport::Cache.lookup_store(:mem_cache_store)
assert_called_with client(cache), :add, [ "foo", ActiveSupport::Cache::Entry, 1, Hash ] do
assert_called_with client(cache), :add, [ "foo", Object, 1, Hash ] do
cache.write("foo", "bar", expires_in: 1, unless_exist: true)
end
end
@ -280,3 +281,36 @@ class MemCacheStoreTest < ActiveSupport::TestCase
end
end
end
class OptimizedMemCacheStoreTest < MemCacheStoreTest
def setup
@previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 7.0
super
end
def forward_compatibility
previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 6.1
@old_store = lookup_store
ActiveSupport::Cache.format_version = previous_format
@old_store.write("foo", "bar")
assert_equal "bar", @cache.read("foo")
end
def forward_compatibility
previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 6.1
@old_store = lookup_store
ActiveSupport::Cache.format_version = previous_format
@cache.write("foo", "bar")
assert_equal "bar", @old_store.read("foo")
end
def teardown
super
ActiveSupport::Cache.format_version = @previous_format
end
end

View File

@ -209,6 +209,39 @@ module ActiveSupport::Cache::RedisCacheStoreTests
end
end
class OptimizedRedisCacheStoreCommonBehaviorTest < RedisCacheStoreCommonBehaviorTest
def before_setup
@previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 7.0
super
end
def forward_compatibility
previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 6.1
@old_store = lookup_store
ActiveSupport::Cache.format_version = previous_format
@old_store.write("foo", "bar")
assert_equal "bar", @cache.read("foo")
end
def forward_compatibility
previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 6.1
@old_store = lookup_store
ActiveSupport::Cache.format_version = previous_format
@cache.write("foo", "bar")
assert_equal "bar", @old_store.read("foo")
end
def after_teardown
super
ActiveSupport::Cache.format_version = @previous_format
end
end
class ConnectionPoolBehaviourTest < StoreTest
include ConnectionPoolBehavior

View File

@ -865,6 +865,8 @@ There are a few configuration options available in Active Support:
* `config.active_support.use_authenticated_message_encryption` specifies whether to use AES-256-GCM authenticated encryption as the default cipher for encrypting messages instead of AES-256-CBC.
* `config.active_support.cache_format_version` specifies which version of the cache serializer to use. Possible values are `6.1` and `7.0`. Defaults to `6.1`.
* `ActiveSupport::Logger.silencer` is set to `false` to disable the ability to silence logging in a block. The default is `true`.
* `ActiveSupport::Cache::Store.logger` specifies the logger to use within cache store operations.

View File

@ -210,6 +210,7 @@ module Rails
active_support.hash_digest_class = OpenSSL::Digest::SHA256
active_support.key_generator_hash_digest_class = OpenSSL::Digest::SHA256
active_support.remove_deprecated_time_with_zone_name = true
active_support.cache_format_version = 7.0
end
else
raise "Unknown version #{target_version.to_s.inspect}"