commit 9e386d9ff6af95db165bbb0da77e75ce474ddebc Author: Agis Anastasopoulos Date: Thu Jun 25 23:04:02 2020 +0300 Initial commit diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..4498166 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,4 @@ +# Changelog + +## master (unreleased) + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..43fbf80 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License + +Copyright (c) 2020 Skroutz S.A. + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..c8e9537 --- /dev/null +++ b/README.md @@ -0,0 +1,103 @@ +# RSpecQ + +RSpecQ (`rspecq`) distributes and executes an RSpec suite over many workers, +using a centralized queue backed by Redis. + +RSpecQ is heavily inspired by [test-queue](https://github.com/tmm1/test-queue) +and [ci-queue](https://github.com/Shopify/ci-queue). + +## Why don't you just use ci-queue? + +While evaluating ci-queue for our RSpec suite, we observed slow boot times +in the workers (up to 3 minutes), increased memory consumption and too much +disk I/O on boot. This is due to the fact that a worker in ci-queue has to +load every spec file on boot. This can be problematic for applications with +a large number of spec files. + +RSpecQ works with spec files as its unit of work (as opposed to ci-queue which +works with individual examples). This means that an RSpecQ worker does not +have to load all spec files at once and so it doesn't have the aforementioned +problems. It also allows suites to keep using `before(:all)` hooks +(which ci-queue explicitly rejects). (Note: RSpecQ also schedules individual +examples, but only when this is deemed necessary, see section +"Spec file splitting"). + +We also observed faster build times by scheduling spec files instead of +individual examples, due to way less Redis operations. + +The downside of this design is that it's more complicated, since the scheduling +of spec files happens based on timings calculated from previous runs. This +means that RSpecQ maintains a key with the timing of each job and updates it +on every run (if the `--timings` option was used). Also, RSpecQ has a "slow +file threshold" which, currently has to be set manually (but this can be +improved). + +*Update*: ci-queue deprecated support for RSpec, so there's that. + +## Usage + +Each worker needs to know the build it will participate in, its name and where +Redis is located. To start a worker: + +```shell +$ rspecq --build-id=foo --worker-id=worker1 --redis=redis://localhost +``` + +To view the progress of the build print use `--report`: + +```shell +$ rspecq --build-id=foo --worker-id=reporter --redis=redis://localhost --report +``` + +For detailed info use `--help`. + + +## How it works + +The basic idea is identical to ci-queue so please refer to its README + +### Terminology + +- Job: the smallest unit of work, which is usually a spec file + (e.g. `./spec/models/foo_spec.rb`) but can also be an individual example + (e.g. `./spec/models/foo_spec.rb[1:2:1]`) if the file is too slow +- Queue: a collection of Redis-backed structures that hold all the necessary + information for RSpecQ to function. This includes timing statistics, jobs to + be executed, the failure reports, requeueing statistics and more. +- Worker: a process that, given a build id, pops up jobs of that build and + executes them using RSpec +- Reporter: a process that, given a build id, waits for the build to finish + and prints the summary report (examples executed, build result, failures etc.) + +### Spec file splitting + +Very slow files may put a limit to how fast the suite can execute. For example, +a worker may spend 10 minutes running a single slow file, while all the other +workers finish after 8 minutes. To overcome this issue, rspecq splits +files that their execution time is above a certain threshold +(set with the `--file-split-threshold` option) and will instead schedule them as +individual examples. + +In the future, we'd like for the slow threshold to be calculated and set +dynamically. + +### Requeues + +As a mitigation measure for flaky tests, if an example fails it will be put +back to the queue to be picked up by +another worker. This will be repeated up to a certain number of times before, +after which the example will be considered a legit failure and will be printed +in the final report (`--report`). + +### Worker failures + +Workers emit a timestamp after each example, as a heartbeat, to denote +that they're fine and performing jobs. If a worker hasn't reported for +a given amount of time (see `WORKER_LIVENESS_SEC`) it is considered dead +and the job it reserved will be requeued, so that it is picked up by another worker. + +This protects us against unrecoverable worker failures (e.g. segfault). + +## License + +RSpecQ is licensed under MIT. See [LICENSE](LICENSE). diff --git a/bin/rspecq b/bin/rspecq new file mode 100755 index 0000000..db1b018 --- /dev/null +++ b/bin/rspecq @@ -0,0 +1,67 @@ +#!/usr/bin/env ruby +require "optionparser" +require "rspecq" + +opts = {} +OptionParser.new do |o| + o.banner = "Usage: #{$PROGRAM_NAME} [opts] [files_or_directories_to_run]" + + o.on("--build-id ID", "A unique identifier denoting the build") do |v| + opts[:build_id] = v + end + + o.on("--worker-id ID", "A unique identifier denoting the worker") do |v| + opts[:worker_id] = v + end + + o.on("--redis HOST", "Redis HOST to connect to (default: 127.0.0.1)") do |v| + opts[:redis_host] = v || "127.0.0.1" + end + + o.on("--timings", "Populate global job timings in Redis") do |v| + opts[:timings] = v + end + + o.on("--file-split-threshold N", "Split spec files slower than N sec. and " \ + "schedule them by example (default: 999999)") do |v| + opts[:file_split_threshold] = Float(v) + end + + o.on("--report", "Do not execute tests but wait until queue is empty and " \ + "print a report") do |v| + opts[:report] = v + end + + o.on("--report-timeout N", Integer, "Fail if queue is not empty after " \ + "N seconds. Only applicable if --report is enabled " \ + "(default: 3600)") do |v| + opts[:report_timeout] = v + end + +end.parse! + +[:build_id, :worker_id].each do |o| + raise OptionParser::MissingArgument.new(o) if opts[o].nil? +end + +if opts[:report] + reporter = RSpecQ::Reporter.new( + build_id: opts[:build_id], + worker_id: opts[:worker_id], + timeout: opts[:report_timeout] || 3600, + redis_host: opts[:redis_host], + ) + + reporter.report +else + worker = RSpecQ::Worker.new( + build_id: opts[:build_id], + worker_id: opts[:worker_id], + redis_host: opts[:redis_host], + files_or_dirs_to_run: ARGV[0] || "spec", + ) + + worker.populate_timings = opts[:timings] + worker.file_split_threshold = opts[:file_split_threshold] || 999999 + worker.work +end diff --git a/lib/rspecq.rb b/lib/rspecq.rb new file mode 100644 index 0000000..11e14ad --- /dev/null +++ b/lib/rspecq.rb @@ -0,0 +1,21 @@ +require "rspec/core" + +module RSpecQ + MAX_REQUEUES = 3 + + # If a worker haven't executed an RSpec example for more than this time + # (in seconds), it is considered dead and its reserved work will be put back + # to the queue, to be picked up by another worker. + WORKER_LIVENESS_SEC = 60.0 +end + +require_relative "rspecq/formatters/example_count_recorder" +require_relative "rspecq/formatters/failure_recorder" +require_relative "rspecq/formatters/job_timing_recorder" +require_relative "rspecq/formatters/worker_heartbeat_recorder" + +require_relative "rspecq/queue" +require_relative "rspecq/reporter" +require_relative "rspecq/worker" + +require_relative "rspecq/version" diff --git a/lib/rspecq/formatters/example_count_recorder.rb b/lib/rspecq/formatters/example_count_recorder.rb new file mode 100644 index 0000000..4fde439 --- /dev/null +++ b/lib/rspecq/formatters/example_count_recorder.rb @@ -0,0 +1,15 @@ +module RSpecQ + module Formatters + # Increments the example counter after each job. + class ExampleCountRecorder + def initialize(queue) + @queue = queue + end + + def dump_summary(summary) + n = summary.examples.count + @queue.increment_example_count(n) if n > 0 + end + end + end +end diff --git a/lib/rspecq/formatters/failure_recorder.rb b/lib/rspecq/formatters/failure_recorder.rb new file mode 100644 index 0000000..91f27c9 --- /dev/null +++ b/lib/rspecq/formatters/failure_recorder.rb @@ -0,0 +1,50 @@ +module RSpecQ + module Formatters + class FailureRecorder + def initialize(queue, job) + @queue = queue + @job = job + @colorizer = RSpec::Core::Formatters::ConsoleCodes + @non_example_error_recorded = false + end + + # Here we're notified about errors occuring outside of examples. + # + # NOTE: Upon such an error, RSpec emits multiple notifications but we only + # want the _first_, which is the one that contains the error backtrace. + # That's why have to keep track of whether we've already received the + # needed notification and act accordingly. + def message(n) + if RSpec.world.non_example_failure && !@non_example_error_recorded + @queue.record_non_example_error(@job, n.message) + @non_example_error_recorded = true + end + end + + def example_failed(notification) + example = notification.example + + if @queue.requeue_job(example.id, MAX_REQUEUES) + # HACK: try to avoid picking the job we just requeued; we want it + # to be picked up by a different worker + sleep 0.5 + return + end + + presenter = RSpec::Core::Formatters::ExceptionPresenter.new( + example.exception, example) + + msg = presenter.fully_formatted(nil, @colorizer) + msg << "\n" + msg << @colorizer.wrap( + "bin/rspec #{example.location_rerun_argument}", + RSpec.configuration.failure_color) + + msg << @colorizer.wrap( + " # #{example.full_description}", RSpec.configuration.detail_color) + + @queue.record_example_failure(notification.example.id, msg) + end + end + end +end diff --git a/lib/rspecq/formatters/job_timing_recorder.rb b/lib/rspecq/formatters/job_timing_recorder.rb new file mode 100644 index 0000000..fdac7e6 --- /dev/null +++ b/lib/rspecq/formatters/job_timing_recorder.rb @@ -0,0 +1,14 @@ +module RSpecQ + module Formatters + class JobTimingRecorder + def initialize(queue, job) + @queue = queue + @job = job + end + + def dump_summary(summary) + @queue.record_timing(@job, Float(summary.duration)) + end + end + end +end diff --git a/lib/rspecq/formatters/worker_heartbeat_recorder.rb b/lib/rspecq/formatters/worker_heartbeat_recorder.rb new file mode 100644 index 0000000..d97c1a5 --- /dev/null +++ b/lib/rspecq/formatters/worker_heartbeat_recorder.rb @@ -0,0 +1,17 @@ +module RSpecQ + module Formatters + # Updates the respective heartbeat key of the worker after each example. + # + # Refer to the documentation of WORKER_LIVENESS_SEC for more info. + class WorkerHeartbeatRecorder + def initialize(worker) + @worker = worker + end + + def example_finished(*) + @worker.update_heartbeat + end + end + end +end + diff --git a/lib/rspecq/queue.rb b/lib/rspecq/queue.rb new file mode 100644 index 0000000..f1c09aa --- /dev/null +++ b/lib/rspecq/queue.rb @@ -0,0 +1,288 @@ +require "redis" + +module RSpecQ + class Queue + RESERVE_JOB = <<~LUA.freeze + local queue = KEYS[1] + local queue_running = KEYS[2] + local worker_id = ARGV[1] + + local job = redis.call('lpop', queue) + if job then + redis.call('hset', queue_running, worker_id, job) + return job + else + return nil + end + LUA + + # Scans for dead workers and puts their reserved jobs back to the queue. + REQUEUE_LOST_JOB = <<~LUA.freeze + local worker_heartbeats = KEYS[1] + local queue_running = KEYS[2] + local queue_unprocessed = KEYS[3] + local time_now = ARGV[1] + local timeout = ARGV[2] + + local dead_workers = redis.call('zrangebyscore', worker_heartbeats, 0, time_now - timeout) + for _, worker in ipairs(dead_workers) do + local job = redis.call('hget', queue_running, worker) + if job then + redis.call('lpush', queue_unprocessed, job) + redis.call('hdel', queue_running, worker) + return job + end + end + + return nil + LUA + + REQUEUE_JOB = <<~LUA.freeze + local key_queue_unprocessed = KEYS[1] + local key_requeues = KEYS[2] + local job = ARGV[1] + local max_requeues = ARGV[2] + + local requeued_times = redis.call('hget', key_requeues, job) + if requeued_times and requeued_times >= max_requeues then + return nil + end + + redis.call('lpush', key_queue_unprocessed, job) + redis.call('hincrby', key_requeues, job, 1) + + return true + LUA + + STATUS_INITIALIZING = "initializing".freeze + STATUS_READY = "ready".freeze + + def initialize(build_id, worker_id, redis_host) + @build_id = build_id + @worker_id = worker_id + @redis = Redis.new(host: redis_host, id: worker_id) + end + + # NOTE: jobs will be processed from head to tail (lpop) + def publish(jobs) + @redis.multi do + @redis.rpush(key_queue_unprocessed, jobs) + @redis.set(key_queue_status, STATUS_READY) + end.first + end + + def reserve_job + @redis.eval( + RESERVE_JOB, + keys: [ + key_queue_unprocessed, + key_queue_running, + ], + argv: [@worker_id] + ) + end + + def requeue_lost_job + @redis.eval( + REQUEUE_LOST_JOB, + keys: [ + key_worker_heartbeats, + key_queue_running, + key_queue_unprocessed + ], + argv: [ + current_time, + WORKER_LIVENESS_SEC + ] + ) + end + + # NOTE: The same job might happen to be acknowledged more than once, in + # the case of requeues. + def acknowledge_job(job) + @redis.multi do + @redis.hdel(key_queue_running, @worker_id) + @redis.sadd(key_queue_processed, job) + end + end + + # Put job at the head of the queue to be re-processed right after, by + # another worker. This is a mitigation measure against flaky tests. + # + # Returns nil if the job hit the requeue limit and therefore was not + # requeued and should be considered a failure. + def requeue_job(job, max_requeues) + return false if max_requeues.zero? + + @redis.eval( + REQUEUE_JOB, + keys: [key_queue_unprocessed, key_requeues], + argv: [job, max_requeues], + ) + end + + def record_example_failure(example_id, message) + @redis.hset(key_failures, example_id, message) + end + + # For errors occured outside of examples (e.g. while loading a spec file) + def record_non_example_error(job, message) + @redis.hset(key_errors, job, message) + end + + def record_timing(job, duration) + @redis.zadd(key_timings, duration, job) + end + + def record_build_time(duration) + @redis.multi do + @redis.lpush(key_build_times, Float(duration)) + @redis.ltrim(key_build_times, 0, 99) + end + end + + def record_worker_heartbeat + @redis.zadd(key_worker_heartbeats, current_time, @worker_id) + end + + def increment_example_count(n) + @redis.incrby(key_example_count, n) + end + + def example_count + @redis.get(key_example_count) || 0 + end + + def processed_jobs_count + @redis.scard(key_queue_processed) + end + + def become_master + @redis.setnx(key_queue_status, STATUS_INITIALIZING) + end + + # ordered by execution time desc (slowest are in the head) + def timings + Hash[@redis.zrevrange(key_timings, 0, -1, withscores: true)] + end + + def example_failures + @redis.hgetall(key_failures) + end + + def non_example_errors + @redis.hgetall(key_errors) + end + + def exhausted? + return false if !published? + + @redis.multi do + @redis.llen(key_queue_unprocessed) + @redis.hlen(key_queue_running) + end.inject(:+).zero? + end + + def published? + @redis.get(key_queue_status) == STATUS_READY + end + + def wait_until_published(timeout=30) + (timeout * 10).times do + return if published? + sleep 0.1 + end + + raise "Queue not yet published after #{timeout} seconds" + end + + def build_successful? + exhausted? && example_failures.empty? && non_example_errors.empty? + end + + private + + def key(*keys) + [@build_id, keys].join(":") + end + + # redis: STRING [STATUS_INITIALIZING, STATUS_READY] + def key_queue_status + key("queue", "status") + end + + # redis: LIST + def key_queue_unprocessed + key("queue", "unprocessed") + end + + # redis: HASH job> + def key_queue_running + key("queue", "running") + end + + # redis: SET + def key_queue_processed + key("queue", "processed") + end + + # Contains regular RSpec example failures. + # + # redis: HASH error message> + def key_failures + key("example_failures") + end + + # Contains errors raised outside of RSpec examples + # (e.g. a syntax error in spec_helper.rb). + # + # redis: HASH error message> + def key_errors + key("errors") + end + + # As a mitigation mechanism for flaky tests, we requeue example failures + # to be retried by another worker, up to a certain number of times. + # + # redis: HASH times_retried> + def key_requeues + key("requeues") + end + + # The total number of examples, those that were requeued. + # + # redis: STRING + def key_example_count + key("example_count") + end + + # redis: ZSET timestamp> + # + # Timestamp of the last example processed by each worker. + def key_worker_heartbeats + key("worker_heartbeats") + end + + # redis: ZSET duration> + # + # NOTE: This key is not scoped to a build (i.e. shared among all builds), + # so be careful to only publish timings from a single branch (e.g. master). + # Otherwise, timings won't be accurate. + def key_timings + "timings" + end + + # redis: LIST + # + # Last build is at the head of the list. + def key_build_times + "build_times" + end + + # We don't use any Ruby `Time` methods because specs that use timecop in + # before(:all) hooks will mess up our times. + def current_time + @redis.time[0] + end + end +end diff --git a/lib/rspecq/reporter.rb b/lib/rspecq/reporter.rb new file mode 100644 index 0000000..ff29cd3 --- /dev/null +++ b/lib/rspecq/reporter.rb @@ -0,0 +1,95 @@ +module RSpecQ + class Reporter + def initialize(build_id:, worker_id:, timeout:, redis_host:) + @build_id = build_id + @worker_id = worker_id + @timeout = timeout + @queue = Queue.new(build_id, worker_id, redis_host) + + # We want feedback to be immediattely printed to CI users, so + # we disable buffering. + STDOUT.sync = true + end + + def report + t = measure_duration { @queue.wait_until_published } + + finished = false + + reported_failures = {} + failure_heading_printed = false + + tests_duration = measure_duration do + @timeout.times do |i| + @queue.example_failures.each do |job, rspec_output| + next if reported_failures[job] + + if !failure_heading_printed + puts "\nFailures:\n" + failure_heading_printed = true + end + + reported_failures[job] = true + puts failure_formatted(rspec_output) + end + + if !@queue.exhausted? + sleep 1 + next + end + + finished = true + break + end + end + + raise "Build not finished after #{@timeout} seconds" if !finished + + @queue.record_build_time(tests_duration) + puts summary(@queue.example_failures, @queue.non_example_errors, + humanize_duration(tests_duration)) + + exit 1 if !@queue.build_successful? + end + + private + + def measure_duration + start = Process.clock_gettime(Process::CLOCK_MONOTONIC) + yield + (Process.clock_gettime(Process::CLOCK_MONOTONIC) - start).round(2) + end + + # We try to keep this output consistent with RSpec's original output + def summary(failures, errors, duration) + failed_examples_section = "\nFailed examples:\n\n" + + failures.each do |_job, msg| + parts = msg.split("\n") + failed_examples_section << " #{parts[-1]}\n" + end + + summary = "" + summary << failed_examples_section if !failures.empty? + + errors.each { |_job, msg| summary << msg } + + summary << "\n" + summary << "Total results:\n" + summary << " #{@queue.example_count} examples " \ + "(#{@queue.processed_jobs_count} jobs), " \ + "#{failures.count} failures, " \ + "#{errors.count} errors" + summary << "\n\n" + summary << "Spec execution time: #{duration}" + end + + def failure_formatted(rspec_output) + rspec_output.split("\n")[0..-2].join("\n") + end + + def humanize_duration(seconds) + Time.at(seconds).utc.strftime("%H:%M:%S") + end + end +end diff --git a/lib/rspecq/version.rb b/lib/rspecq/version.rb new file mode 100644 index 0000000..fc88cca --- /dev/null +++ b/lib/rspecq/version.rb @@ -0,0 +1,3 @@ +module RSpecQ + VERSION = "0.0.1.pre1".freeze +end diff --git a/lib/rspecq/worker.rb b/lib/rspecq/worker.rb new file mode 100644 index 0000000..ac50fb8 --- /dev/null +++ b/lib/rspecq/worker.rb @@ -0,0 +1,185 @@ +require "json" +require "pp" + +module RSpecQ + class Worker + HEARTBEAT_FREQUENCY = WORKER_LIVENESS_SEC / 6 + + # If true, job timings will be populated in the global Redis timings key + # + # Defaults to false + attr_accessor :populate_timings + + # If set, spec files that are known to take more than this value to finish, + # will be split and scheduled on a per-example basis. + attr_accessor :file_split_threshold + + def initialize(build_id:, worker_id:, redis_host:, files_or_dirs_to_run:) + @build_id = build_id + @worker_id = worker_id + @queue = Queue.new(build_id, worker_id, redis_host) + @files_or_dirs_to_run = files_or_dirs_to_run + @populate_timings = false + @file_split_threshold = 999999 + + RSpec::Core::Formatters.register(Formatters::JobTimingRecorder, :dump_summary) + RSpec::Core::Formatters.register(Formatters::ExampleCountRecorder, :dump_summary) + RSpec::Core::Formatters.register(Formatters::FailureRecorder, :example_failed, :message) + RSpec::Core::Formatters.register(Formatters::WorkerHeartbeatRecorder, :example_finished) + end + + def work + puts "Working for build #{@build_id} (worker=#{@worker_id})" + + try_publish_queue!(@queue) + @queue.wait_until_published + + loop do + # we have to bootstrap this so that it can be used in the first call + # to `requeue_lost_job` inside the work loop + update_heartbeat + + lost = @queue.requeue_lost_job + puts "Requeued lost job: #{lost}" if lost + + # TODO: can we make `reserve_job` also act like exhausted? and get + # rid of `exhausted?` (i.e. return false if no jobs remain) + job = @queue.reserve_job + + # build is finished + return if job.nil? && @queue.exhausted? + + next if job.nil? + + puts + puts "Executing #{job}" + + reset_rspec_state! + + # reconfigure rspec + RSpec.configuration.detail_color = :magenta + RSpec.configuration.seed = srand && srand % 0xFFFF + RSpec.configuration.backtrace_formatter.filter_gem('rspecq') + RSpec.configuration.add_formatter(Formatters::FailureRecorder.new(@queue, job)) + RSpec.configuration.add_formatter(Formatters::ExampleCountRecorder.new(@queue)) + RSpec.configuration.add_formatter(Formatters::WorkerHeartbeatRecorder.new(self)) + + if populate_timings + RSpec.configuration.add_formatter(Formatters::JobTimingRecorder.new(@queue, job)) + end + + opts = RSpec::Core::ConfigurationOptions.new(["--format", "progress", job]) + _result = RSpec::Core::Runner.new(opts).run($stderr, $stdout) + + @queue.acknowledge_job(job) + end + end + + # Update the worker heartbeat if necessary + def update_heartbeat + if @heartbeat_updated_at.nil? || elapsed(@heartbeat_updated_at) >= HEARTBEAT_FREQUENCY + @queue.record_worker_heartbeat + @heartbeat_updated_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + end + + private + + def reset_rspec_state! + RSpec.clear_examples + + # TODO: remove after https://github.com/rspec/rspec-core/pull/2723 + RSpec.world.instance_variable_set(:@example_group_counts_by_spec_file, Hash.new(0)) + + # RSpec.clear_examples does not reset those, which causes issues when + # a non-example error occurs (subsequent jobs are not executed) + # TODO: upstream + RSpec.world.non_example_failure = false + + # we don't want an error that occured outside of the examples (which + # would set this to `true`) to stop the worker + RSpec.world.wants_to_quit = false + end + + def try_publish_queue!(queue) + return if !queue.become_master + + RSpec.configuration.files_or_directories_to_run = @files_or_dirs_to_run + files_to_run = RSpec.configuration.files_to_run.map { |j| relative_path(j) } + + timings = queue.timings + if timings.empty? + # TODO: should be a warning reported somewhere (Sentry?) + q_size = queue.publish(files_to_run.shuffle) + puts "WARNING: No timings found! Published queue in " \ + "random order (size=#{q_size})" + return + end + + slow_files = timings.take_while do |_job, duration| + duration >= file_split_threshold + end.map(&:first) & files_to_run + + if slow_files.any? + puts "Slow files (threshold=#{file_split_threshold}): #{slow_files}" + end + + # prepare jobs to run + jobs = [] + jobs.concat(files_to_run - slow_files) + jobs.concat(files_to_example_ids(slow_files)) if slow_files.any? + + # assign timings to all of them + default_timing = timings.values[timings.values.size/2] + + jobs = jobs.each_with_object({}) do |j, h| + # heuristic: put untimed jobs in the middle of the queue + puts "New/untimed job: #{j}" if timings[j].nil? + h[j] = timings[j] || default_timing + end + + # finally, sort them based on their timing (slowest first) + jobs = jobs.sort_by { |_j, t| -t }.map(&:first) + + puts "Published queue (size=#{queue.publish(jobs)})" + end + + # NOTE: RSpec has to load the files before we can split them as individual + # examples. In case a file to be splitted fails to be loaded + # (e.g. contains a syntax error), we return the slow files unchanged, + # thereby falling back to scheduling them normally. + # + # Their errors will be reported in the normal flow, when they're picked up + # as jobs by a worker. + def files_to_example_ids(files) + # TODO: do this programatically + cmd = "DISABLE_SPRING=1 bin/rspec --dry-run --format json #{files.join(' ')}" + out = `#{cmd}` + + if !$?.success? + # TODO: emit warning to Sentry + puts "WARNING: Error splitting slow files; falling back to regular scheduling:" + + begin + pp JSON.parse(out) + rescue JSON::ParserError + puts out + end + puts + + return files + end + + JSON.parse(out)["examples"].map { |e| e["id"] } + end + + def relative_path(job) + @cwd ||= Pathname.new(Dir.pwd) + "./#{Pathname.new(job).relative_path_from(@cwd)}" + end + + def elapsed(since) + Process.clock_gettime(Process::CLOCK_MONOTONIC) - since + end + end +end diff --git a/rspecq.gemspec b/rspecq.gemspec new file mode 100644 index 0000000..d262e95 --- /dev/null +++ b/rspecq.gemspec @@ -0,0 +1,18 @@ +require_relative "lib/rspecq/version" + +Gem::Specification.new do |s| + s.name = "rspecq" + s.version = RSpecQ::VERSION + s.summary = "Distribute an RSpec suite among many workers" + s.authors = "Agis Anastasopoulos" + s.email = "agis.anast@gmail.com" + s.files = Dir["lib/**/*", "CHANGELOG.md", "LICENSE", "Rakefile", "README.md"] + s.executables << "rspecq" + s.homepage = "https://github.com/skroutz/rspecq" + s.license = "MIT" + + s.add_dependency "rspec-core" + + s.add_development_dependency "minitest", "~> 5.14" + s.add_development_dependency "rake" +end