Initial commit
This commit is contained in:
commit
9e386d9ff6
|
@ -0,0 +1,4 @@
|
|||
# Changelog
|
||||
|
||||
## master (unreleased)
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
The MIT License
|
||||
|
||||
Copyright (c) 2020 Skroutz S.A.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
the Software without restriction, including without limitation the rights to
|
||||
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
@ -0,0 +1,103 @@
|
|||
# RSpecQ
|
||||
|
||||
RSpecQ (`rspecq`) distributes and executes an RSpec suite over many workers,
|
||||
using a centralized queue backed by Redis.
|
||||
|
||||
RSpecQ is heavily inspired by [test-queue](https://github.com/tmm1/test-queue)
|
||||
and [ci-queue](https://github.com/Shopify/ci-queue).
|
||||
|
||||
## Why don't you just use ci-queue?
|
||||
|
||||
While evaluating ci-queue for our RSpec suite, we observed slow boot times
|
||||
in the workers (up to 3 minutes), increased memory consumption and too much
|
||||
disk I/O on boot. This is due to the fact that a worker in ci-queue has to
|
||||
load every spec file on boot. This can be problematic for applications with
|
||||
a large number of spec files.
|
||||
|
||||
RSpecQ works with spec files as its unit of work (as opposed to ci-queue which
|
||||
works with individual examples). This means that an RSpecQ worker does not
|
||||
have to load all spec files at once and so it doesn't have the aforementioned
|
||||
problems. It also allows suites to keep using `before(:all)` hooks
|
||||
(which ci-queue explicitly rejects). (Note: RSpecQ also schedules individual
|
||||
examples, but only when this is deemed necessary, see section
|
||||
"Spec file splitting").
|
||||
|
||||
We also observed faster build times by scheduling spec files instead of
|
||||
individual examples, due to way less Redis operations.
|
||||
|
||||
The downside of this design is that it's more complicated, since the scheduling
|
||||
of spec files happens based on timings calculated from previous runs. This
|
||||
means that RSpecQ maintains a key with the timing of each job and updates it
|
||||
on every run (if the `--timings` option was used). Also, RSpecQ has a "slow
|
||||
file threshold" which, currently has to be set manually (but this can be
|
||||
improved).
|
||||
|
||||
*Update*: ci-queue deprecated support for RSpec, so there's that.
|
||||
|
||||
## Usage
|
||||
|
||||
Each worker needs to know the build it will participate in, its name and where
|
||||
Redis is located. To start a worker:
|
||||
|
||||
```shell
|
||||
$ rspecq --build-id=foo --worker-id=worker1 --redis=redis://localhost
|
||||
```
|
||||
|
||||
To view the progress of the build print use `--report`:
|
||||
|
||||
```shell
|
||||
$ rspecq --build-id=foo --worker-id=reporter --redis=redis://localhost --report
|
||||
```
|
||||
|
||||
For detailed info use `--help`.
|
||||
|
||||
|
||||
## How it works
|
||||
|
||||
The basic idea is identical to ci-queue so please refer to its README
|
||||
|
||||
### Terminology
|
||||
|
||||
- Job: the smallest unit of work, which is usually a spec file
|
||||
(e.g. `./spec/models/foo_spec.rb`) but can also be an individual example
|
||||
(e.g. `./spec/models/foo_spec.rb[1:2:1]`) if the file is too slow
|
||||
- Queue: a collection of Redis-backed structures that hold all the necessary
|
||||
information for RSpecQ to function. This includes timing statistics, jobs to
|
||||
be executed, the failure reports, requeueing statistics and more.
|
||||
- Worker: a process that, given a build id, pops up jobs of that build and
|
||||
executes them using RSpec
|
||||
- Reporter: a process that, given a build id, waits for the build to finish
|
||||
and prints the summary report (examples executed, build result, failures etc.)
|
||||
|
||||
### Spec file splitting
|
||||
|
||||
Very slow files may put a limit to how fast the suite can execute. For example,
|
||||
a worker may spend 10 minutes running a single slow file, while all the other
|
||||
workers finish after 8 minutes. To overcome this issue, rspecq splits
|
||||
files that their execution time is above a certain threshold
|
||||
(set with the `--file-split-threshold` option) and will instead schedule them as
|
||||
individual examples.
|
||||
|
||||
In the future, we'd like for the slow threshold to be calculated and set
|
||||
dynamically.
|
||||
|
||||
### Requeues
|
||||
|
||||
As a mitigation measure for flaky tests, if an example fails it will be put
|
||||
back to the queue to be picked up by
|
||||
another worker. This will be repeated up to a certain number of times before,
|
||||
after which the example will be considered a legit failure and will be printed
|
||||
in the final report (`--report`).
|
||||
|
||||
### Worker failures
|
||||
|
||||
Workers emit a timestamp after each example, as a heartbeat, to denote
|
||||
that they're fine and performing jobs. If a worker hasn't reported for
|
||||
a given amount of time (see `WORKER_LIVENESS_SEC`) it is considered dead
|
||||
and the job it reserved will be requeued, so that it is picked up by another worker.
|
||||
|
||||
This protects us against unrecoverable worker failures (e.g. segfault).
|
||||
|
||||
## License
|
||||
|
||||
RSpecQ is licensed under MIT. See [LICENSE](LICENSE).
|
|
@ -0,0 +1,67 @@
|
|||
#!/usr/bin/env ruby
|
||||
require "optionparser"
|
||||
require "rspecq"
|
||||
|
||||
opts = {}
|
||||
OptionParser.new do |o|
|
||||
o.banner = "Usage: #{$PROGRAM_NAME} [opts] [files_or_directories_to_run]"
|
||||
|
||||
o.on("--build-id ID", "A unique identifier denoting the build") do |v|
|
||||
opts[:build_id] = v
|
||||
end
|
||||
|
||||
o.on("--worker-id ID", "A unique identifier denoting the worker") do |v|
|
||||
opts[:worker_id] = v
|
||||
end
|
||||
|
||||
o.on("--redis HOST", "Redis HOST to connect to (default: 127.0.0.1)") do |v|
|
||||
opts[:redis_host] = v || "127.0.0.1"
|
||||
end
|
||||
|
||||
o.on("--timings", "Populate global job timings in Redis") do |v|
|
||||
opts[:timings] = v
|
||||
end
|
||||
|
||||
o.on("--file-split-threshold N", "Split spec files slower than N sec. and " \
|
||||
"schedule them by example (default: 999999)") do |v|
|
||||
opts[:file_split_threshold] = Float(v)
|
||||
end
|
||||
|
||||
o.on("--report", "Do not execute tests but wait until queue is empty and " \
|
||||
"print a report") do |v|
|
||||
opts[:report] = v
|
||||
end
|
||||
|
||||
o.on("--report-timeout N", Integer, "Fail if queue is not empty after " \
|
||||
"N seconds. Only applicable if --report is enabled " \
|
||||
"(default: 3600)") do |v|
|
||||
opts[:report_timeout] = v
|
||||
end
|
||||
|
||||
end.parse!
|
||||
|
||||
[:build_id, :worker_id].each do |o|
|
||||
raise OptionParser::MissingArgument.new(o) if opts[o].nil?
|
||||
end
|
||||
|
||||
if opts[:report]
|
||||
reporter = RSpecQ::Reporter.new(
|
||||
build_id: opts[:build_id],
|
||||
worker_id: opts[:worker_id],
|
||||
timeout: opts[:report_timeout] || 3600,
|
||||
redis_host: opts[:redis_host],
|
||||
)
|
||||
|
||||
reporter.report
|
||||
else
|
||||
worker = RSpecQ::Worker.new(
|
||||
build_id: opts[:build_id],
|
||||
worker_id: opts[:worker_id],
|
||||
redis_host: opts[:redis_host],
|
||||
files_or_dirs_to_run: ARGV[0] || "spec",
|
||||
)
|
||||
|
||||
worker.populate_timings = opts[:timings]
|
||||
worker.file_split_threshold = opts[:file_split_threshold] || 999999
|
||||
worker.work
|
||||
end
|
|
@ -0,0 +1,21 @@
|
|||
require "rspec/core"
|
||||
|
||||
module RSpecQ
|
||||
MAX_REQUEUES = 3
|
||||
|
||||
# If a worker haven't executed an RSpec example for more than this time
|
||||
# (in seconds), it is considered dead and its reserved work will be put back
|
||||
# to the queue, to be picked up by another worker.
|
||||
WORKER_LIVENESS_SEC = 60.0
|
||||
end
|
||||
|
||||
require_relative "rspecq/formatters/example_count_recorder"
|
||||
require_relative "rspecq/formatters/failure_recorder"
|
||||
require_relative "rspecq/formatters/job_timing_recorder"
|
||||
require_relative "rspecq/formatters/worker_heartbeat_recorder"
|
||||
|
||||
require_relative "rspecq/queue"
|
||||
require_relative "rspecq/reporter"
|
||||
require_relative "rspecq/worker"
|
||||
|
||||
require_relative "rspecq/version"
|
|
@ -0,0 +1,15 @@
|
|||
module RSpecQ
|
||||
module Formatters
|
||||
# Increments the example counter after each job.
|
||||
class ExampleCountRecorder
|
||||
def initialize(queue)
|
||||
@queue = queue
|
||||
end
|
||||
|
||||
def dump_summary(summary)
|
||||
n = summary.examples.count
|
||||
@queue.increment_example_count(n) if n > 0
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,50 @@
|
|||
module RSpecQ
|
||||
module Formatters
|
||||
class FailureRecorder
|
||||
def initialize(queue, job)
|
||||
@queue = queue
|
||||
@job = job
|
||||
@colorizer = RSpec::Core::Formatters::ConsoleCodes
|
||||
@non_example_error_recorded = false
|
||||
end
|
||||
|
||||
# Here we're notified about errors occuring outside of examples.
|
||||
#
|
||||
# NOTE: Upon such an error, RSpec emits multiple notifications but we only
|
||||
# want the _first_, which is the one that contains the error backtrace.
|
||||
# That's why have to keep track of whether we've already received the
|
||||
# needed notification and act accordingly.
|
||||
def message(n)
|
||||
if RSpec.world.non_example_failure && !@non_example_error_recorded
|
||||
@queue.record_non_example_error(@job, n.message)
|
||||
@non_example_error_recorded = true
|
||||
end
|
||||
end
|
||||
|
||||
def example_failed(notification)
|
||||
example = notification.example
|
||||
|
||||
if @queue.requeue_job(example.id, MAX_REQUEUES)
|
||||
# HACK: try to avoid picking the job we just requeued; we want it
|
||||
# to be picked up by a different worker
|
||||
sleep 0.5
|
||||
return
|
||||
end
|
||||
|
||||
presenter = RSpec::Core::Formatters::ExceptionPresenter.new(
|
||||
example.exception, example)
|
||||
|
||||
msg = presenter.fully_formatted(nil, @colorizer)
|
||||
msg << "\n"
|
||||
msg << @colorizer.wrap(
|
||||
"bin/rspec #{example.location_rerun_argument}",
|
||||
RSpec.configuration.failure_color)
|
||||
|
||||
msg << @colorizer.wrap(
|
||||
" # #{example.full_description}", RSpec.configuration.detail_color)
|
||||
|
||||
@queue.record_example_failure(notification.example.id, msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,14 @@
|
|||
module RSpecQ
|
||||
module Formatters
|
||||
class JobTimingRecorder
|
||||
def initialize(queue, job)
|
||||
@queue = queue
|
||||
@job = job
|
||||
end
|
||||
|
||||
def dump_summary(summary)
|
||||
@queue.record_timing(@job, Float(summary.duration))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,17 @@
|
|||
module RSpecQ
|
||||
module Formatters
|
||||
# Updates the respective heartbeat key of the worker after each example.
|
||||
#
|
||||
# Refer to the documentation of WORKER_LIVENESS_SEC for more info.
|
||||
class WorkerHeartbeatRecorder
|
||||
def initialize(worker)
|
||||
@worker = worker
|
||||
end
|
||||
|
||||
def example_finished(*)
|
||||
@worker.update_heartbeat
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -0,0 +1,288 @@
|
|||
require "redis"
|
||||
|
||||
module RSpecQ
|
||||
class Queue
|
||||
RESERVE_JOB = <<~LUA.freeze
|
||||
local queue = KEYS[1]
|
||||
local queue_running = KEYS[2]
|
||||
local worker_id = ARGV[1]
|
||||
|
||||
local job = redis.call('lpop', queue)
|
||||
if job then
|
||||
redis.call('hset', queue_running, worker_id, job)
|
||||
return job
|
||||
else
|
||||
return nil
|
||||
end
|
||||
LUA
|
||||
|
||||
# Scans for dead workers and puts their reserved jobs back to the queue.
|
||||
REQUEUE_LOST_JOB = <<~LUA.freeze
|
||||
local worker_heartbeats = KEYS[1]
|
||||
local queue_running = KEYS[2]
|
||||
local queue_unprocessed = KEYS[3]
|
||||
local time_now = ARGV[1]
|
||||
local timeout = ARGV[2]
|
||||
|
||||
local dead_workers = redis.call('zrangebyscore', worker_heartbeats, 0, time_now - timeout)
|
||||
for _, worker in ipairs(dead_workers) do
|
||||
local job = redis.call('hget', queue_running, worker)
|
||||
if job then
|
||||
redis.call('lpush', queue_unprocessed, job)
|
||||
redis.call('hdel', queue_running, worker)
|
||||
return job
|
||||
end
|
||||
end
|
||||
|
||||
return nil
|
||||
LUA
|
||||
|
||||
REQUEUE_JOB = <<~LUA.freeze
|
||||
local key_queue_unprocessed = KEYS[1]
|
||||
local key_requeues = KEYS[2]
|
||||
local job = ARGV[1]
|
||||
local max_requeues = ARGV[2]
|
||||
|
||||
local requeued_times = redis.call('hget', key_requeues, job)
|
||||
if requeued_times and requeued_times >= max_requeues then
|
||||
return nil
|
||||
end
|
||||
|
||||
redis.call('lpush', key_queue_unprocessed, job)
|
||||
redis.call('hincrby', key_requeues, job, 1)
|
||||
|
||||
return true
|
||||
LUA
|
||||
|
||||
STATUS_INITIALIZING = "initializing".freeze
|
||||
STATUS_READY = "ready".freeze
|
||||
|
||||
def initialize(build_id, worker_id, redis_host)
|
||||
@build_id = build_id
|
||||
@worker_id = worker_id
|
||||
@redis = Redis.new(host: redis_host, id: worker_id)
|
||||
end
|
||||
|
||||
# NOTE: jobs will be processed from head to tail (lpop)
|
||||
def publish(jobs)
|
||||
@redis.multi do
|
||||
@redis.rpush(key_queue_unprocessed, jobs)
|
||||
@redis.set(key_queue_status, STATUS_READY)
|
||||
end.first
|
||||
end
|
||||
|
||||
def reserve_job
|
||||
@redis.eval(
|
||||
RESERVE_JOB,
|
||||
keys: [
|
||||
key_queue_unprocessed,
|
||||
key_queue_running,
|
||||
],
|
||||
argv: [@worker_id]
|
||||
)
|
||||
end
|
||||
|
||||
def requeue_lost_job
|
||||
@redis.eval(
|
||||
REQUEUE_LOST_JOB,
|
||||
keys: [
|
||||
key_worker_heartbeats,
|
||||
key_queue_running,
|
||||
key_queue_unprocessed
|
||||
],
|
||||
argv: [
|
||||
current_time,
|
||||
WORKER_LIVENESS_SEC
|
||||
]
|
||||
)
|
||||
end
|
||||
|
||||
# NOTE: The same job might happen to be acknowledged more than once, in
|
||||
# the case of requeues.
|
||||
def acknowledge_job(job)
|
||||
@redis.multi do
|
||||
@redis.hdel(key_queue_running, @worker_id)
|
||||
@redis.sadd(key_queue_processed, job)
|
||||
end
|
||||
end
|
||||
|
||||
# Put job at the head of the queue to be re-processed right after, by
|
||||
# another worker. This is a mitigation measure against flaky tests.
|
||||
#
|
||||
# Returns nil if the job hit the requeue limit and therefore was not
|
||||
# requeued and should be considered a failure.
|
||||
def requeue_job(job, max_requeues)
|
||||
return false if max_requeues.zero?
|
||||
|
||||
@redis.eval(
|
||||
REQUEUE_JOB,
|
||||
keys: [key_queue_unprocessed, key_requeues],
|
||||
argv: [job, max_requeues],
|
||||
)
|
||||
end
|
||||
|
||||
def record_example_failure(example_id, message)
|
||||
@redis.hset(key_failures, example_id, message)
|
||||
end
|
||||
|
||||
# For errors occured outside of examples (e.g. while loading a spec file)
|
||||
def record_non_example_error(job, message)
|
||||
@redis.hset(key_errors, job, message)
|
||||
end
|
||||
|
||||
def record_timing(job, duration)
|
||||
@redis.zadd(key_timings, duration, job)
|
||||
end
|
||||
|
||||
def record_build_time(duration)
|
||||
@redis.multi do
|
||||
@redis.lpush(key_build_times, Float(duration))
|
||||
@redis.ltrim(key_build_times, 0, 99)
|
||||
end
|
||||
end
|
||||
|
||||
def record_worker_heartbeat
|
||||
@redis.zadd(key_worker_heartbeats, current_time, @worker_id)
|
||||
end
|
||||
|
||||
def increment_example_count(n)
|
||||
@redis.incrby(key_example_count, n)
|
||||
end
|
||||
|
||||
def example_count
|
||||
@redis.get(key_example_count) || 0
|
||||
end
|
||||
|
||||
def processed_jobs_count
|
||||
@redis.scard(key_queue_processed)
|
||||
end
|
||||
|
||||
def become_master
|
||||
@redis.setnx(key_queue_status, STATUS_INITIALIZING)
|
||||
end
|
||||
|
||||
# ordered by execution time desc (slowest are in the head)
|
||||
def timings
|
||||
Hash[@redis.zrevrange(key_timings, 0, -1, withscores: true)]
|
||||
end
|
||||
|
||||
def example_failures
|
||||
@redis.hgetall(key_failures)
|
||||
end
|
||||
|
||||
def non_example_errors
|
||||
@redis.hgetall(key_errors)
|
||||
end
|
||||
|
||||
def exhausted?
|
||||
return false if !published?
|
||||
|
||||
@redis.multi do
|
||||
@redis.llen(key_queue_unprocessed)
|
||||
@redis.hlen(key_queue_running)
|
||||
end.inject(:+).zero?
|
||||
end
|
||||
|
||||
def published?
|
||||
@redis.get(key_queue_status) == STATUS_READY
|
||||
end
|
||||
|
||||
def wait_until_published(timeout=30)
|
||||
(timeout * 10).times do
|
||||
return if published?
|
||||
sleep 0.1
|
||||
end
|
||||
|
||||
raise "Queue not yet published after #{timeout} seconds"
|
||||
end
|
||||
|
||||
def build_successful?
|
||||
exhausted? && example_failures.empty? && non_example_errors.empty?
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def key(*keys)
|
||||
[@build_id, keys].join(":")
|
||||
end
|
||||
|
||||
# redis: STRING [STATUS_INITIALIZING, STATUS_READY]
|
||||
def key_queue_status
|
||||
key("queue", "status")
|
||||
end
|
||||
|
||||
# redis: LIST<job>
|
||||
def key_queue_unprocessed
|
||||
key("queue", "unprocessed")
|
||||
end
|
||||
|
||||
# redis: HASH<worker_id => job>
|
||||
def key_queue_running
|
||||
key("queue", "running")
|
||||
end
|
||||
|
||||
# redis: SET<job>
|
||||
def key_queue_processed
|
||||
key("queue", "processed")
|
||||
end
|
||||
|
||||
# Contains regular RSpec example failures.
|
||||
#
|
||||
# redis: HASH<example_id => error message>
|
||||
def key_failures
|
||||
key("example_failures")
|
||||
end
|
||||
|
||||
# Contains errors raised outside of RSpec examples
|
||||
# (e.g. a syntax error in spec_helper.rb).
|
||||
#
|
||||
# redis: HASH<job => error message>
|
||||
def key_errors
|
||||
key("errors")
|
||||
end
|
||||
|
||||
# As a mitigation mechanism for flaky tests, we requeue example failures
|
||||
# to be retried by another worker, up to a certain number of times.
|
||||
#
|
||||
# redis: HASH<job => times_retried>
|
||||
def key_requeues
|
||||
key("requeues")
|
||||
end
|
||||
|
||||
# The total number of examples, those that were requeued.
|
||||
#
|
||||
# redis: STRING<integer>
|
||||
def key_example_count
|
||||
key("example_count")
|
||||
end
|
||||
|
||||
# redis: ZSET<worker_id => timestamp>
|
||||
#
|
||||
# Timestamp of the last example processed by each worker.
|
||||
def key_worker_heartbeats
|
||||
key("worker_heartbeats")
|
||||
end
|
||||
|
||||
# redis: ZSET<job => duration>
|
||||
#
|
||||
# NOTE: This key is not scoped to a build (i.e. shared among all builds),
|
||||
# so be careful to only publish timings from a single branch (e.g. master).
|
||||
# Otherwise, timings won't be accurate.
|
||||
def key_timings
|
||||
"timings"
|
||||
end
|
||||
|
||||
# redis: LIST<duration>
|
||||
#
|
||||
# Last build is at the head of the list.
|
||||
def key_build_times
|
||||
"build_times"
|
||||
end
|
||||
|
||||
# We don't use any Ruby `Time` methods because specs that use timecop in
|
||||
# before(:all) hooks will mess up our times.
|
||||
def current_time
|
||||
@redis.time[0]
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,95 @@
|
|||
module RSpecQ
|
||||
class Reporter
|
||||
def initialize(build_id:, worker_id:, timeout:, redis_host:)
|
||||
@build_id = build_id
|
||||
@worker_id = worker_id
|
||||
@timeout = timeout
|
||||
@queue = Queue.new(build_id, worker_id, redis_host)
|
||||
|
||||
# We want feedback to be immediattely printed to CI users, so
|
||||
# we disable buffering.
|
||||
STDOUT.sync = true
|
||||
end
|
||||
|
||||
def report
|
||||
t = measure_duration { @queue.wait_until_published }
|
||||
|
||||
finished = false
|
||||
|
||||
reported_failures = {}
|
||||
failure_heading_printed = false
|
||||
|
||||
tests_duration = measure_duration do
|
||||
@timeout.times do |i|
|
||||
@queue.example_failures.each do |job, rspec_output|
|
||||
next if reported_failures[job]
|
||||
|
||||
if !failure_heading_printed
|
||||
puts "\nFailures:\n"
|
||||
failure_heading_printed = true
|
||||
end
|
||||
|
||||
reported_failures[job] = true
|
||||
puts failure_formatted(rspec_output)
|
||||
end
|
||||
|
||||
if !@queue.exhausted?
|
||||
sleep 1
|
||||
next
|
||||
end
|
||||
|
||||
finished = true
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
raise "Build not finished after #{@timeout} seconds" if !finished
|
||||
|
||||
@queue.record_build_time(tests_duration)
|
||||
puts summary(@queue.example_failures, @queue.non_example_errors,
|
||||
humanize_duration(tests_duration))
|
||||
|
||||
exit 1 if !@queue.build_successful?
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def measure_duration
|
||||
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
yield
|
||||
(Process.clock_gettime(Process::CLOCK_MONOTONIC) - start).round(2)
|
||||
end
|
||||
|
||||
# We try to keep this output consistent with RSpec's original output
|
||||
def summary(failures, errors, duration)
|
||||
failed_examples_section = "\nFailed examples:\n\n"
|
||||
|
||||
failures.each do |_job, msg|
|
||||
parts = msg.split("\n")
|
||||
failed_examples_section << " #{parts[-1]}\n"
|
||||
end
|
||||
|
||||
summary = ""
|
||||
summary << failed_examples_section if !failures.empty?
|
||||
|
||||
errors.each { |_job, msg| summary << msg }
|
||||
|
||||
summary << "\n"
|
||||
summary << "Total results:\n"
|
||||
summary << " #{@queue.example_count} examples " \
|
||||
"(#{@queue.processed_jobs_count} jobs), " \
|
||||
"#{failures.count} failures, " \
|
||||
"#{errors.count} errors"
|
||||
summary << "\n\n"
|
||||
summary << "Spec execution time: #{duration}"
|
||||
end
|
||||
|
||||
def failure_formatted(rspec_output)
|
||||
rspec_output.split("\n")[0..-2].join("\n")
|
||||
end
|
||||
|
||||
def humanize_duration(seconds)
|
||||
Time.at(seconds).utc.strftime("%H:%M:%S")
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,3 @@
|
|||
module RSpecQ
|
||||
VERSION = "0.0.1.pre1".freeze
|
||||
end
|
|
@ -0,0 +1,185 @@
|
|||
require "json"
|
||||
require "pp"
|
||||
|
||||
module RSpecQ
|
||||
class Worker
|
||||
HEARTBEAT_FREQUENCY = WORKER_LIVENESS_SEC / 6
|
||||
|
||||
# If true, job timings will be populated in the global Redis timings key
|
||||
#
|
||||
# Defaults to false
|
||||
attr_accessor :populate_timings
|
||||
|
||||
# If set, spec files that are known to take more than this value to finish,
|
||||
# will be split and scheduled on a per-example basis.
|
||||
attr_accessor :file_split_threshold
|
||||
|
||||
def initialize(build_id:, worker_id:, redis_host:, files_or_dirs_to_run:)
|
||||
@build_id = build_id
|
||||
@worker_id = worker_id
|
||||
@queue = Queue.new(build_id, worker_id, redis_host)
|
||||
@files_or_dirs_to_run = files_or_dirs_to_run
|
||||
@populate_timings = false
|
||||
@file_split_threshold = 999999
|
||||
|
||||
RSpec::Core::Formatters.register(Formatters::JobTimingRecorder, :dump_summary)
|
||||
RSpec::Core::Formatters.register(Formatters::ExampleCountRecorder, :dump_summary)
|
||||
RSpec::Core::Formatters.register(Formatters::FailureRecorder, :example_failed, :message)
|
||||
RSpec::Core::Formatters.register(Formatters::WorkerHeartbeatRecorder, :example_finished)
|
||||
end
|
||||
|
||||
def work
|
||||
puts "Working for build #{@build_id} (worker=#{@worker_id})"
|
||||
|
||||
try_publish_queue!(@queue)
|
||||
@queue.wait_until_published
|
||||
|
||||
loop do
|
||||
# we have to bootstrap this so that it can be used in the first call
|
||||
# to `requeue_lost_job` inside the work loop
|
||||
update_heartbeat
|
||||
|
||||
lost = @queue.requeue_lost_job
|
||||
puts "Requeued lost job: #{lost}" if lost
|
||||
|
||||
# TODO: can we make `reserve_job` also act like exhausted? and get
|
||||
# rid of `exhausted?` (i.e. return false if no jobs remain)
|
||||
job = @queue.reserve_job
|
||||
|
||||
# build is finished
|
||||
return if job.nil? && @queue.exhausted?
|
||||
|
||||
next if job.nil?
|
||||
|
||||
puts
|
||||
puts "Executing #{job}"
|
||||
|
||||
reset_rspec_state!
|
||||
|
||||
# reconfigure rspec
|
||||
RSpec.configuration.detail_color = :magenta
|
||||
RSpec.configuration.seed = srand && srand % 0xFFFF
|
||||
RSpec.configuration.backtrace_formatter.filter_gem('rspecq')
|
||||
RSpec.configuration.add_formatter(Formatters::FailureRecorder.new(@queue, job))
|
||||
RSpec.configuration.add_formatter(Formatters::ExampleCountRecorder.new(@queue))
|
||||
RSpec.configuration.add_formatter(Formatters::WorkerHeartbeatRecorder.new(self))
|
||||
|
||||
if populate_timings
|
||||
RSpec.configuration.add_formatter(Formatters::JobTimingRecorder.new(@queue, job))
|
||||
end
|
||||
|
||||
opts = RSpec::Core::ConfigurationOptions.new(["--format", "progress", job])
|
||||
_result = RSpec::Core::Runner.new(opts).run($stderr, $stdout)
|
||||
|
||||
@queue.acknowledge_job(job)
|
||||
end
|
||||
end
|
||||
|
||||
# Update the worker heartbeat if necessary
|
||||
def update_heartbeat
|
||||
if @heartbeat_updated_at.nil? || elapsed(@heartbeat_updated_at) >= HEARTBEAT_FREQUENCY
|
||||
@queue.record_worker_heartbeat
|
||||
@heartbeat_updated_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def reset_rspec_state!
|
||||
RSpec.clear_examples
|
||||
|
||||
# TODO: remove after https://github.com/rspec/rspec-core/pull/2723
|
||||
RSpec.world.instance_variable_set(:@example_group_counts_by_spec_file, Hash.new(0))
|
||||
|
||||
# RSpec.clear_examples does not reset those, which causes issues when
|
||||
# a non-example error occurs (subsequent jobs are not executed)
|
||||
# TODO: upstream
|
||||
RSpec.world.non_example_failure = false
|
||||
|
||||
# we don't want an error that occured outside of the examples (which
|
||||
# would set this to `true`) to stop the worker
|
||||
RSpec.world.wants_to_quit = false
|
||||
end
|
||||
|
||||
def try_publish_queue!(queue)
|
||||
return if !queue.become_master
|
||||
|
||||
RSpec.configuration.files_or_directories_to_run = @files_or_dirs_to_run
|
||||
files_to_run = RSpec.configuration.files_to_run.map { |j| relative_path(j) }
|
||||
|
||||
timings = queue.timings
|
||||
if timings.empty?
|
||||
# TODO: should be a warning reported somewhere (Sentry?)
|
||||
q_size = queue.publish(files_to_run.shuffle)
|
||||
puts "WARNING: No timings found! Published queue in " \
|
||||
"random order (size=#{q_size})"
|
||||
return
|
||||
end
|
||||
|
||||
slow_files = timings.take_while do |_job, duration|
|
||||
duration >= file_split_threshold
|
||||
end.map(&:first) & files_to_run
|
||||
|
||||
if slow_files.any?
|
||||
puts "Slow files (threshold=#{file_split_threshold}): #{slow_files}"
|
||||
end
|
||||
|
||||
# prepare jobs to run
|
||||
jobs = []
|
||||
jobs.concat(files_to_run - slow_files)
|
||||
jobs.concat(files_to_example_ids(slow_files)) if slow_files.any?
|
||||
|
||||
# assign timings to all of them
|
||||
default_timing = timings.values[timings.values.size/2]
|
||||
|
||||
jobs = jobs.each_with_object({}) do |j, h|
|
||||
# heuristic: put untimed jobs in the middle of the queue
|
||||
puts "New/untimed job: #{j}" if timings[j].nil?
|
||||
h[j] = timings[j] || default_timing
|
||||
end
|
||||
|
||||
# finally, sort them based on their timing (slowest first)
|
||||
jobs = jobs.sort_by { |_j, t| -t }.map(&:first)
|
||||
|
||||
puts "Published queue (size=#{queue.publish(jobs)})"
|
||||
end
|
||||
|
||||
# NOTE: RSpec has to load the files before we can split them as individual
|
||||
# examples. In case a file to be splitted fails to be loaded
|
||||
# (e.g. contains a syntax error), we return the slow files unchanged,
|
||||
# thereby falling back to scheduling them normally.
|
||||
#
|
||||
# Their errors will be reported in the normal flow, when they're picked up
|
||||
# as jobs by a worker.
|
||||
def files_to_example_ids(files)
|
||||
# TODO: do this programatically
|
||||
cmd = "DISABLE_SPRING=1 bin/rspec --dry-run --format json #{files.join(' ')}"
|
||||
out = `#{cmd}`
|
||||
|
||||
if !$?.success?
|
||||
# TODO: emit warning to Sentry
|
||||
puts "WARNING: Error splitting slow files; falling back to regular scheduling:"
|
||||
|
||||
begin
|
||||
pp JSON.parse(out)
|
||||
rescue JSON::ParserError
|
||||
puts out
|
||||
end
|
||||
puts
|
||||
|
||||
return files
|
||||
end
|
||||
|
||||
JSON.parse(out)["examples"].map { |e| e["id"] }
|
||||
end
|
||||
|
||||
def relative_path(job)
|
||||
@cwd ||= Pathname.new(Dir.pwd)
|
||||
"./#{Pathname.new(job).relative_path_from(@cwd)}"
|
||||
end
|
||||
|
||||
def elapsed(since)
|
||||
Process.clock_gettime(Process::CLOCK_MONOTONIC) - since
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,18 @@
|
|||
require_relative "lib/rspecq/version"
|
||||
|
||||
Gem::Specification.new do |s|
|
||||
s.name = "rspecq"
|
||||
s.version = RSpecQ::VERSION
|
||||
s.summary = "Distribute an RSpec suite among many workers"
|
||||
s.authors = "Agis Anastasopoulos"
|
||||
s.email = "agis.anast@gmail.com"
|
||||
s.files = Dir["lib/**/*", "CHANGELOG.md", "LICENSE", "Rakefile", "README.md"]
|
||||
s.executables << "rspecq"
|
||||
s.homepage = "https://github.com/skroutz/rspecq"
|
||||
s.license = "MIT"
|
||||
|
||||
s.add_dependency "rspec-core"
|
||||
|
||||
s.add_development_dependency "minitest", "~> 5.14"
|
||||
s.add_development_dependency "rake"
|
||||
end
|
Loading…
Reference in New Issue