Mirror direct uploads

This commit is contained in:
George Claghorn 2019-05-22 15:07:35 -04:00 committed by GitHub
parent ff34f78248
commit d5a2f7ec14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 159 additions and 74 deletions

View File

@ -1,3 +1,14 @@
* The mirror service supports direct uploads.
New files are directly uploaded to the primary service. When a
directly-uploaded file is attached to a record, a background job is enqueued
to copy it to each secondary service.
Configure the queue used to process mirroring jobs by setting
`config.active_storage.queues.mirror`. The default is `:active_storage_mirror`.
*George Claghorn*
* The S3 service now permits uploading files larger than 5 gigabytes. * The S3 service now permits uploading files larger than 5 gigabytes.
When uploading a file greater than 100 megabytes in size, the service When uploading a file greater than 100 megabytes in size, the service

View File

@ -0,0 +1,13 @@
# frozen_string_literal: true
# Provides asynchronous mirroring of directly-uploaded blobs.
class ActiveStorage::MirrorJob < ActiveStorage::BaseJob
queue_as { ActiveStorage.queues[:mirror] }
discard_on ActiveStorage::FileNotFoundError
retry_on ActiveStorage::IntegrityError, attempts: 10, wait: :exponentially_longer
def perform(key, checksum:)
ActiveStorage::Blob.service.try(:mirror, key, checksum: checksum)
end
end

View File

@ -13,7 +13,7 @@ class ActiveStorage::Attachment < ActiveRecord::Base
delegate_missing_to :blob delegate_missing_to :blob
after_create_commit :analyze_blob_later, :identify_blob after_create_commit :mirror_blob_later, :analyze_blob_later, :identify_blob
after_destroy_commit :purge_dependent_blob_later after_destroy_commit :purge_dependent_blob_later
# Synchronously deletes the attachment and {purges the blob}[rdoc-ref:ActiveStorage::Blob#purge]. # Synchronously deletes the attachment and {purges the blob}[rdoc-ref:ActiveStorage::Blob#purge].
@ -37,6 +37,10 @@ class ActiveStorage::Attachment < ActiveRecord::Base
blob.analyze_later unless blob.analyzed? blob.analyze_later unless blob.analyzed?
end end
def mirror_blob_later
blob.mirror_later
end
def purge_dependent_blob_later def purge_dependent_blob_later
blob&.purge_later if dependent == :purge_later blob&.purge_later if dependent == :purge_later
end end

View File

@ -207,6 +207,9 @@ class ActiveStorage::Blob < ActiveRecord::Base
name: [ "ActiveStorage-#{id}-", filename.extension_with_delimiter ], tmpdir: tmpdir, &block name: [ "ActiveStorage-#{id}-", filename.extension_with_delimiter ], tmpdir: tmpdir, &block
end end
def mirror_later #:nodoc:
ActiveStorage::MirrorJob.perform_later(key, checksum: checksum) if service.respond_to?(:mirror)
end
# Deletes the files on the service associated with the blob. This should only be done if the blob is going to be # Deletes the files on the service associated with the blob. This should only be done if the blob is going to be
# deleted as well or you will essentially have a dead reference. It's recommended to use #purge and #purge_later # deleted as well or you will essentially have a dead reference. It's recommended to use #purge and #purge_later

View File

@ -24,7 +24,7 @@ module ActiveStorage
config.active_storage.previewers = [ ActiveStorage::Previewer::PopplerPDFPreviewer, ActiveStorage::Previewer::MuPDFPreviewer, ActiveStorage::Previewer::VideoPreviewer ] config.active_storage.previewers = [ ActiveStorage::Previewer::PopplerPDFPreviewer, ActiveStorage::Previewer::MuPDFPreviewer, ActiveStorage::Previewer::VideoPreviewer ]
config.active_storage.analyzers = [ ActiveStorage::Analyzer::ImageAnalyzer, ActiveStorage::Analyzer::VideoAnalyzer ] config.active_storage.analyzers = [ ActiveStorage::Analyzer::ImageAnalyzer, ActiveStorage::Analyzer::VideoAnalyzer ]
config.active_storage.paths = ActiveSupport::OrderedOptions.new config.active_storage.paths = ActiveSupport::OrderedOptions.new
config.active_storage.queues = ActiveSupport::OrderedOptions.new config.active_storage.queues = ActiveSupport::InheritableOptions.new(mirror: :active_storage_mirror)
config.active_storage.variable_content_types = %w( config.active_storage.variable_content_types = %w(
image/png image/png
@ -130,7 +130,7 @@ module ActiveStorage
"config.active_storage.queue is deprecated and will be removed in Rails 6.1. " \ "config.active_storage.queue is deprecated and will be removed in Rails 6.1. " \
"Set config.active_storage.queues.purge and config.active_storage.queues.analysis instead." "Set config.active_storage.queues.purge and config.active_storage.queues.analysis instead."
ActiveStorage.queues = { purge: queue, analysis: queue } ActiveStorage.queues = { purge: queue, analysis: queue, mirror: queue }
else else
ActiveStorage.queues = app.config.active_storage.queues || {} ActiveStorage.queues = app.config.active_storage.queues || {}
end end

View File

@ -32,6 +32,12 @@ module ActiveStorage
debug event, color("Generated URL for file at key: #{key_in(event)} (#{event.payload[:url]})", BLUE) debug event, color("Generated URL for file at key: #{key_in(event)} (#{event.payload[:url]})", BLUE)
end end
def service_mirror(event)
message = "Mirrored file at key: #{key_in(event)}"
message += " (checksum: #{event.payload[:checksum]})" if event.payload[:checksum]
debug event, color(message, GREEN)
end
def logger def logger
ActiveStorage.logger ActiveStorage.logger
end end

View File

@ -4,12 +4,17 @@ require "active_support/core_ext/module/delegation"
module ActiveStorage module ActiveStorage
# Wraps a set of mirror services and provides a single ActiveStorage::Service object that will all # Wraps a set of mirror services and provides a single ActiveStorage::Service object that will all
# have the files uploaded to them. A +primary+ service is designated to answer calls to +download+, +exists?+, # have the files uploaded to them. A +primary+ service is designated to answer calls to:
# and +url+. # * +download+
# * +exists?+
# * +url+
# * +url_for_direct_upload+
# * +headers_for_direct_upload+
class Service::MirrorService < Service class Service::MirrorService < Service
attr_reader :primary, :mirrors attr_reader :primary, :mirrors
delegate :download, :download_chunk, :exist?, :url, :path_for, to: :primary delegate :download, :download_chunk, :exist?, :url,
:url_for_direct_upload, :headers_for_direct_upload, :path_for, to: :primary
# Stitch together from named services. # Stitch together from named services.
def self.build(primary:, mirrors:, configurator:, **options) #:nodoc: def self.build(primary:, mirrors:, configurator:, **options) #:nodoc:
@ -26,7 +31,8 @@ module ActiveStorage
# ensure a match when the upload has completed or raise an ActiveStorage::IntegrityError. # ensure a match when the upload has completed or raise an ActiveStorage::IntegrityError.
def upload(key, io, checksum: nil, **options) def upload(key, io, checksum: nil, **options)
each_service.collect do |service| each_service.collect do |service|
service.upload key, io.tap(&:rewind), checksum: checksum, **options io.rewind
service.upload key, io, checksum: checksum, **options
end end
end end
@ -40,6 +46,21 @@ module ActiveStorage
perform_across_services :delete_prefixed, prefix perform_across_services :delete_prefixed, prefix
end end
# Copy the file at the +key+ from the primary service to each of the mirrors where it doesn't already exist.
def mirror(key, checksum:)
instrument :mirror, key: key, checksum: checksum do
if (mirrors_in_need_of_mirroring = mirrors.select { |service| !service.exist?(key) }).any?
primary.open(key, checksum: checksum) do |io|
mirrors_in_need_of_mirroring.each do |service|
io.rewind
service.upload key, io, checksum: checksum
end
end
end
end
end
private private
def each_service(&block) def each_service(&block)
[ primary, *mirrors ].each(&block) [ primary, *mirrors ].each(&block)

View File

@ -269,46 +269,6 @@ class ActiveStorage::ManyAttachedTest < ActiveSupport::TestCase
end end
end end
test "analyzing a new blob from an uploaded file after attaching it to an existing record" do
perform_enqueued_jobs do
@user.highlights.attach fixture_file_upload("racecar.jpg")
end
assert @user.highlights.reload.first.analyzed?
assert_equal 4104, @user.highlights.first.metadata[:width]
assert_equal 2736, @user.highlights.first.metadata[:height]
end
test "analyzing a new blob from an uploaded file after attaching it to an existing record via update" do
perform_enqueued_jobs do
@user.update! highlights: [ fixture_file_upload("racecar.jpg") ]
end
assert @user.highlights.reload.first.analyzed?
assert_equal 4104, @user.highlights.first.metadata[:width]
assert_equal 2736, @user.highlights.first.metadata[:height]
end
test "analyzing a directly-uploaded blob after attaching it to an existing record" do
perform_enqueued_jobs do
@user.highlights.attach directly_upload_file_blob(filename: "racecar.jpg")
end
assert @user.highlights.reload.first.analyzed?
assert_equal 4104, @user.highlights.first.metadata[:width]
assert_equal 2736, @user.highlights.first.metadata[:height]
end
test "analyzing a directly-uploaded blob after attaching it to an existing record via update" do
perform_enqueued_jobs do
@user.update! highlights: [ directly_upload_file_blob(filename: "racecar.jpg") ]
end
assert @user.highlights.reload.first.analyzed?
assert_equal 4104, @user.highlights.first.metadata[:width]
assert_equal 2736, @user.highlights.first.metadata[:height]
end
test "attaching existing blobs to a new record" do test "attaching existing blobs to a new record" do
User.new(name: "Jason").tap do |user| User.new(name: "Jason").tap do |user|
user.highlights.attach create_blob(filename: "funky.jpg"), create_blob(filename: "town.jpg") user.highlights.attach create_blob(filename: "funky.jpg"), create_blob(filename: "town.jpg")
@ -422,24 +382,6 @@ class ActiveStorage::ManyAttachedTest < ActiveSupport::TestCase
assert_equal "Could not find or build blob: expected attachable, got :foo", error.message assert_equal "Could not find or build blob: expected attachable, got :foo", error.message
end end
test "analyzing a new blob from an uploaded file after attaching it to a new record" do
perform_enqueued_jobs do
user = User.create!(name: "Jason", highlights: [ fixture_file_upload("racecar.jpg") ])
assert user.highlights.reload.first.analyzed?
assert_equal 4104, user.highlights.first.metadata[:width]
assert_equal 2736, user.highlights.first.metadata[:height]
end
end
test "analyzing a directly-uploaded blob after attaching it to a new record" do
perform_enqueued_jobs do
user = User.create!(name: "Jason", highlights: [ directly_upload_file_blob(filename: "racecar.jpg") ])
assert user.highlights.reload.first.analyzed?
assert_equal 4104, user.highlights.first.metadata[:width]
assert_equal 2736, user.highlights.first.metadata[:height]
end
end
test "detaching" do test "detaching" do
[ create_blob(filename: "funky.jpg"), create_blob(filename: "town.jpg") ].tap do |blobs| [ create_blob(filename: "funky.jpg"), create_blob(filename: "town.jpg") ].tap do |blobs|
@user.highlights.attach blobs @user.highlights.attach blobs

View File

@ -0,0 +1,53 @@
# frozen_string_literal: true
require "test_helper"
require "database/setup"
class ActiveStorage::AttachmentTest < ActiveSupport::TestCase
include ActiveJob::TestHelper
setup do
@user = User.create!(name: "Josh")
end
teardown { ActiveStorage::Blob.all.each(&:delete) }
test "analyzing a directly-uploaded blob after attaching it" do
blob = directly_upload_file_blob(filename: "racecar.jpg")
assert_not blob.analyzed?
perform_enqueued_jobs do
@user.highlights.attach(blob)
end
assert blob.reload.analyzed?
assert_equal 4104, blob.metadata[:width]
assert_equal 2736, blob.metadata[:height]
end
test "mirroring a directly-uploaded blob after attaching it" do
previous_service, ActiveStorage::Blob.service = ActiveStorage::Blob.service, build_mirror_service
blob = directly_upload_file_blob
assert_not ActiveStorage::Blob.service.mirrors.second.exist?(blob.key)
perform_enqueued_jobs do
@user.highlights.attach(blob)
end
assert ActiveStorage::Blob.service.mirrors.second.exist?(blob.key)
ensure
ActiveStorage::Blob.service = previous_service
end
private
def build_mirror_service
ActiveStorage::Service::MirrorService.new \
primary: build_disk_service("primary"),
mirrors: 3.times.collect { |i| build_disk_service("mirror_#{i + 1}") }
end
def build_disk_service(purpose)
ActiveStorage::Service::DiskService.new(root: Dir.mktmpdir("active_storage_tests_#{purpose}"))
end
end

View File

@ -53,6 +53,20 @@ class ActiveStorage::Service::MirrorServiceTest < ActiveSupport::TestCase
end end
end end
test "mirroring a file from the primary service to secondary services where it doesn't exist" do
key = SecureRandom.base58(24)
data = "Something else entirely!"
checksum = Digest::MD5.base64digest(data)
@service.primary.upload key, StringIO.new(data), checksum: checksum
@service.mirrors.third.upload key, StringIO.new("Surprise!")
@service.mirror key, checksum: checksum
assert_equal data, @service.mirrors.first.download(key)
assert_equal data, @service.mirrors.second.download(key)
assert_equal "Surprise!", @service.mirrors.third.download(key)
end
test "URL generation in primary service" do test "URL generation in primary service" do
filename = ActiveStorage::Filename.new("test.txt") filename = ActiveStorage::Filename.new("test.txt")

View File

@ -7,6 +7,7 @@ require "bundler/setup"
require "active_support" require "active_support"
require "active_support/test_case" require "active_support/test_case"
require "active_support/testing/autorun" require "active_support/testing/autorun"
require "active_storage/service/mirror_service"
require "image_processing/mini_magick" require "image_processing/mini_magick"
begin begin
@ -67,7 +68,8 @@ class ActiveSupport::TestCase
checksum = Digest::MD5.file(file).base64digest checksum = Digest::MD5.file(file).base64digest
create_blob_before_direct_upload(filename: filename, byte_size: byte_size, checksum: checksum, content_type: content_type).tap do |blob| create_blob_before_direct_upload(filename: filename, byte_size: byte_size, checksum: checksum, content_type: content_type).tap do |blob|
ActiveStorage::Blob.service.upload(blob.key, file.open) service = ActiveStorage::Blob.service.try(:primary) || ActiveStorage::Blob.service
service.upload(blob.key, file.open)
end end
end end

View File

@ -189,14 +189,21 @@ gem "google-cloud-storage", "~> 1.11", require: false
### Mirror Service ### Mirror Service
You can keep multiple services in sync by defining a mirror service. When a file You can keep multiple services in sync by defining a mirror service. A mirror
is uploaded or deleted, it's done across all the mirrored services. Mirrored service replicates uploads and deletes across two or more subordinate services.
services can be used to facilitate a migration between services in production.
You can start mirroring to the new service, copy existing files from the old A mirror service is intended to be used temporarily during a migration between
service to the new, then go all-in on the new service. Define each of the services in production. You can start mirroring to a new service, copy
services you'd like to use as described above and reference them from a mirrored pre-existing files from the old service to the new, then go all-in on the new
service. service.
NOTE: Mirroring is not atomic. It is possible for an upload to succeed on the
primary service and fail on any of the subordinate services. Before going
all-in on a new service, verify that all files have been copied.
Define each of the services you'd like to mirror as described above. Reference
them by name when defining a mirror service:
```yaml ```yaml
s3_west_coast: s3_west_coast:
service: S3 service: S3
@ -219,9 +226,12 @@ production:
- s3_west_coast - s3_west_coast
``` ```
NOTE: Files are served from the primary service. Although all secondary services receive uploads, downloads are always handled
by the primary service.
NOTE: This is not compatible with the [direct uploads](#direct-uploads) feature. Mirror services are compatible with direct uploads. New files are directly
uploaded to the primary service. When a directly-uploaded file is attached to a
record, a background job is enqueued to copy it to the secondary services.
Attaching Files to Records Attaching Files to Records
-------------------------- --------------------------

View File

@ -852,6 +852,12 @@ text/javascript image/svg+xml application/postscript application/x-shockwave-fla
config.active_storage.queues.purge = :low_priority config.active_storage.queues.purge = :low_priority
``` ```
* `config.active_storage.queues.mirror` accepts a symbol indicating the Active Job queue to use for direct upload mirroring jobs. The default is `:active_storage_mirror`.
```ruby
config.active_storage.queues.mirror = :low_priority
```
* `config.active_storage.logger` can be used to set the logger used by Active Storage. Accepts a logger conforming to the interface of Log4r or the default Ruby Logger class. * `config.active_storage.logger` can be used to set the logger used by Active Storage. Accepts a logger conforming to the interface of Log4r or the default Ruby Logger class.
```ruby ```ruby