|
| 1 | +# frozen_string_literal: true |
| 2 | + |
| 3 | +# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved |
| 4 | +# |
| 5 | +# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated |
| 6 | +# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the |
| 7 | +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit |
| 8 | +# persons to whom the Software is furnished to do so, subject to the following conditions: |
| 9 | +# |
| 10 | +# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the |
| 11 | +# Software. |
| 12 | +# |
| 13 | +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE |
| 14 | +# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR |
| 15 | +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR |
| 16 | +# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
| 17 | +# |
| 18 | +module Bigcommerce |
| 19 | + module Prometheus |
| 20 | + module Integrations |
| 21 | + class Resque |
| 22 | + ## |
| 23 | + # Per-Resque-job histogram metrics, recorded from the parent worker process. |
| 24 | + # Hooked via a prepend around Resque::Worker#perform_with_fork. |
| 25 | + # Queue latency is captured before super, perform duration after. |
| 26 | + # |
| 27 | + # Off unless PROMETHEUS_RESQUE_PER_JOB_METRICS_ENABLED=1 |
| 28 | + # Emits one histogram observation per job per worker process, which can be high cardinality at scale. |
| 29 | + # |
| 30 | + # NOTE: queue_latency is supported for jobs enqueued via ActiveJob |
| 31 | + # The gem reads three fields from |
| 32 | + # `payload['args'][0]` (which must be a Hash): |
| 33 | + # |
| 34 | + # * job_class — the user's actual job class name; used as the |
| 35 | + # metric label. |
| 36 | + # * enqueued_at — ISO 8601 string; used as the queue-latency |
| 37 | + # anchor when scheduled_at is absent. |
| 38 | + # * scheduled_at — ISO 8601 string; preferred over enqueued_at |
| 39 | + # when present (e.g. retries-with-backoff, so |
| 40 | + # the intentional wait isn't counted as latency). |
| 41 | + # |
| 42 | + # ActiveJob produces this shape natively — the payload is wrapped by |
| 43 | + # ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper, which stamps |
| 44 | + # the three fields above into `args[0]`. |
| 45 | + # |
| 46 | + # Vanilla Resque jobs enqueued via Resque.enqueue carry no enqueue timestamps. |
| 47 | + # class MyJob |
| 48 | + # @queue = :foo; |
| 49 | + # def self.perform; |
| 50 | + # end |
| 51 | + # Their args are raw primitive values, not a wrapping hash. |
| 52 | + # For these jobs, queue_latency silently no-ops. |
| 53 | + # perform_duration works for both styles regardless. |
| 54 | + # |
| 55 | + # Payloads that replicate the three fields above are read the same way. |
| 56 | + # Detection is by shape, not by wrapper class name. |
| 57 | + # This means a vanilla job can opt in to queue_latency either by |
| 58 | + # - converting to ActiveJob |
| 59 | + # - enqueueing through a small wrapper class that stamps these fields into args[0]. |
| 60 | + # |
| 61 | + module JobMetrics |
| 62 | + class << self |
| 63 | + ## |
| 64 | + # Install the parent-side hooks if the per-job metrics feature is enabled. |
| 65 | + # Idempotent: safe to call multiple times. |
| 66 | + # |
| 67 | + # @param [PrometheusExporter::Client] client |
| 68 | + # |
| 69 | + def start(client:) |
| 70 | + return unless ::Bigcommerce::Prometheus.resque_per_job_metrics_enabled |
| 71 | + |
| 72 | + @client = client |
| 73 | + install_hooks |
| 74 | + end |
| 75 | + |
| 76 | + ## |
| 77 | + # Push the queue-latency observation for a job that's about to be picked up by a worker. |
| 78 | + # Anchors on scheduled_at if present so retries-with-backoff don't show the intentional wait as latency. |
| 79 | + # Falls back to enqueued_at if scheduled_at isn't present. |
| 80 | + # |
| 81 | + # @param [ActiveJobPayload, VanillaResquePayload] payload |
| 82 | + # |
| 83 | + def record_queue_latency(payload) |
| 84 | + anchor = payload.anchor_time |
| 85 | + return unless anchor |
| 86 | + |
| 87 | + # Clock skew between the enqueuer/scheduler and the worker can put the anchor in the future. |
| 88 | + # Clamp to zero so the histogram never records a negative latency. |
| 89 | + latency = (Time.now - anchor).to_f.clamp(0.0..) |
| 90 | + |
| 91 | + @client.send_json( |
| 92 | + type: 'resque_job', |
| 93 | + metric: 'queue_latency', |
| 94 | + value: latency, |
| 95 | + custom_labels: { job_class: payload.job_class } |
| 96 | + ) |
| 97 | + rescue StandardError => e |
| 98 | + ::Bigcommerce::Prometheus.logger&.warn( |
| 99 | + "[bigcommerce-prometheus] resque_job queue_latency push failed: #{e.message}" |
| 100 | + ) |
| 101 | + end |
| 102 | + |
| 103 | + ## |
| 104 | + # Push the perform-duration observation for a completed job. |
| 105 | + # Called from the `Resque::Worker#perform_with_fork` prepend, so it measures the full child lifetime: |
| 106 | + # fork + reconnect + perform + exit |
| 107 | + # |
| 108 | + # The duration is computed here, not at the call site: the caller invokes this from an |
| 109 | + # ensure block, which must never raise over an exception already propagating. Keeping |
| 110 | + # the arithmetic inside this rescue absorbs every recording failure — including a nil |
| 111 | + # started_at when a catastrophic error fired before timing began. |
| 112 | + # |
| 113 | + # @param [ActiveJobPayload, VanillaResquePayload] payload |
| 114 | + # @param [Float] started_at monotonic timestamp taken just before the fork |
| 115 | + # |
| 116 | + def record_perform_duration(payload, started_at) |
| 117 | + @client.send_json( |
| 118 | + type: 'resque_job', |
| 119 | + metric: 'perform_duration', |
| 120 | + value: Process.clock_gettime(Process::CLOCK_MONOTONIC) - started_at, |
| 121 | + custom_labels: { job_class: payload.job_class } |
| 122 | + ) |
| 123 | + rescue StandardError => e |
| 124 | + ::Bigcommerce::Prometheus.logger&.warn( |
| 125 | + "[bigcommerce-prometheus] resque_job perform_duration push failed: #{e.message}" |
| 126 | + ) |
| 127 | + end |
| 128 | + |
| 129 | + private |
| 130 | + |
| 131 | + def install_hooks |
| 132 | + return if @hooks_installed |
| 133 | + |
| 134 | + ::Resque::Worker.prepend(WorkerInstrumentation) |
| 135 | + @hooks_installed = true |
| 136 | + end |
| 137 | + end |
| 138 | + |
| 139 | + ## |
| 140 | + # Prepended onto Resque::Worker to capture for every job that goes through perform_with_fork: |
| 141 | + # - queue latency: before super |
| 142 | + # - perform duration: after super |
| 143 | + module WorkerInstrumentation |
| 144 | + def perform_with_fork(job, &block) |
| 145 | + payload = JobPayload.for(job) |
| 146 | + JobMetrics.record_queue_latency(payload) |
| 147 | + started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC) |
| 148 | + super |
| 149 | + ensure |
| 150 | + # Bare locals only: argument expressions here evaluate outside the |
| 151 | + # recorder's rescue, so they must not be able to raise. |
| 152 | + JobMetrics.record_perform_duration(payload, started_at) |
| 153 | + end |
| 154 | + end |
| 155 | + end |
| 156 | + end |
| 157 | + end |
| 158 | + end |
| 159 | +end |
0 commit comments