send delayed jobs stats to statsd

closes CNVS-12739

log timing for pops, time spent in queue, time to perform job,
sliced by tag (where applicable), shard, and job shard.

test plan:
 * configure statsd.yml to go to 127.0.0.1 port 7856
 * nc -lu 127.0.0.1 7856
 * run jobs
 * you should see lots of job timings sent to netcat

Change-Id: I60a4b6b10d30edd96011e8e8bc8aa6104dfc6daa
Reviewed-on: https://gerrit.instructure.com/34724
Tested-by: Jenkins <jenkins@instructure.com>
Reviewed-by: Brian Palmer <brianp@instructure.com>
Product-Review: Cody Cutrer <cody@instructure.com>
QA-Review: Cody Cutrer <cody@instructure.com>
This commit is contained in:
Cody Cutrer 2014-05-12 13:03:59 -06:00
parent 01190e3e4a
commit 5d4d842ef2
5 changed files with 33 additions and 7 deletions

View File

@ -37,8 +37,17 @@ end
Delayed::Worker.lifecycle.around(:perform) do |worker, job, &block|
starting_mem = Canvas.sample_memory()
starting_cpu = Process.times()
lag = ((Time.now - job.created_at) * 1000).round
tag = CanvasStatsd::Statsd.escape(job.tag)
stats = ["delayedjob.queue", "delayedjob.queue.tag.#{tag}", "delayedjob.queue.shard.#{job.current_shard.id}"]
stats << "delayedjob.queue.jobshard.#{job.shard.id}" if job.respond_to?(:shard)
CanvasStatsd::Statsd.timing(stats, lag)
begin
stats = ["delayedjob.perform", "delayedjob.perform.tag.#{tag}", "delayedjob.perform.shard.#{job.current_shard.id}"]
stats << "delayedjob.perform.jobshard.#{job.shard.id}" if job.respond_to?(:shard)
CanvasStatsd::Statsd.time(stats) do
block.call(worker, job)
end
ensure
ending_cpu = Process.times()
ending_mem = Canvas.sample_memory()
@ -48,3 +57,9 @@ Delayed::Worker.lifecycle.around(:perform) do |worker, job, &block|
Rails.logger.info "[STAT] #{starting_mem} #{ending_mem} #{ending_mem - starting_mem} #{user_cpu} #{system_cpu}"
end
end
Delayed::Worker.lifecycle.around(:pop) do |worker, &block|
CanvasStatsd::Statsd.time(["delayedjob.pop", "delayedjob.pop.jobshard.#{Shard.current(:delayed_job)}"]) do
block.call(worker)
end
end

View File

@ -50,6 +50,13 @@ module CanvasStatsd
class_eval <<-RUBY, __FILE__, __LINE__+1
def self.#{method}(stat, *args)
if self.instance
if Array === stat
stat.each do |st|
self.#{method}(st, *args)
end
return
end
if self.append_hostname?
stat_name = "\#{stat}.\#{hostname}"
else

View File

@ -42,7 +42,7 @@ class Shard
default
end
def self.current
def self.current(category = :default)
default
end

View File

@ -4,6 +4,7 @@ module Delayed
class Lifecycle
EVENTS = {
:perform => [:worker, :job],
:pop => [:worker],
}
def initialize

View File

@ -93,11 +93,14 @@ class Worker
@sleep_delay_stagger ||= Setting.get('delayed_jobs_sleep_delay_stagger', '2.5').to_f
@make_tmpdir ||= Setting.get('delayed_jobs_unique_tmpdir', 'true') == 'true'
job = Delayed::Job.get_and_lock_next_available(
job =
self.class.lifecycle.run_callbacks(:pop, self) do
Delayed::Job.get_and_lock_next_available(
name,
queue,
min_priority,
max_priority)
end
if job
configure_for_job(job) do