Skip to content

Commit fba1fde

Browse files
author
Willem Homan
committed
refactor(platform): PAYMENTS-11567 Split JobPayload into ActiveJobPayload and VanillaResquePayload
1 parent 599ca5c commit fba1fde

9 files changed

Lines changed: 337 additions & 183 deletions

File tree

lib/bigcommerce/prometheus.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
require_relative 'prometheus/integrations/railtie' if defined?(Rails)
4646
require_relative 'prometheus/integrations/puma'
4747
require_relative 'prometheus/integrations/resque'
48+
require_relative 'prometheus/integrations/resque/active_job_payload'
49+
require_relative 'prometheus/integrations/resque/vanilla_resque_payload'
4850
require_relative 'prometheus/integrations/resque/job_payload'
4951
require_relative 'prometheus/integrations/resque/job_metrics'
5052

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
6+
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
7+
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
8+
# persons to whom the Software is furnished to do so, subject to the following conditions:
9+
#
10+
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
11+
# Software.
12+
#
13+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
14+
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
15+
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
16+
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
17+
#
18+
require 'time'
19+
20+
module Bigcommerce
21+
module Prometheus
22+
module Integrations
23+
class Resque
24+
##
25+
# Payload fields for an ActiveJob-shaped Resque job, read from the
26+
# inner hash at `args[0]`. ActiveJob's JobWrapper stamps the three
27+
# fields the per-job metrics consume:
28+
#
29+
# * job_class — the user's actual job class name; used as the
30+
# metric label.
31+
# * enqueued_at — ISO 8601 string; queue-latency anchor when
32+
# scheduled_at is absent.
33+
# * scheduled_at — ISO 8601 string; preferred over enqueued_at
34+
# when present (e.g. retries-with-backoff, so the
35+
# intentional wait isn't counted as latency).
36+
#
37+
# Eagerly parses in #initialize and exposes plain attr_readers —
38+
# does not hold a reference to the inner hash after construction.
39+
#
40+
class ActiveJobPayload
41+
# @return [String] the user's actual job class name
42+
attr_reader :job_class
43+
44+
# @return [Time, nil] the queue-latency anchor; nil when both
45+
# timestamps are absent or unparseable
46+
attr_reader :anchor_time
47+
48+
# @param [Hash] inner the ActiveJob-shaped hash at `args[0]`;
49+
# JobPayload.for guarantees a truthy 'job_class'
50+
def initialize(inner)
51+
@job_class = inner['job_class']
52+
@anchor_time = parse_time(inner['scheduled_at']) || parse_time(inner['enqueued_at'])
53+
end
54+
55+
private
56+
57+
def parse_time(value)
58+
return value if value.is_a?(Time)
59+
return nil if value.nil? || value.to_s.empty?
60+
61+
Time.iso8601(value.to_s)
62+
rescue ArgumentError
63+
nil
64+
end
65+
end
66+
end
67+
end
68+
end
69+
end

lib/bigcommerce/prometheus/integrations/resque/job_metrics.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def start(client:)
7777
# retries-with-backoff don't show the intentional wait as latency),
7878
# falling back to enqueued_at.
7979
#
80-
# @param [JobPayload] payload
80+
# @param [ActiveJobPayload, VanillaResquePayload] payload
8181
#
8282
def record_queue_latency(payload)
8383
anchor = payload.anchor_time
@@ -105,7 +105,7 @@ def record_queue_latency(payload)
105105
# from the `Resque::Worker#perform_with_fork` prepend, so it measures
106106
# the full child lifetime (fork + reconnect + perform + exit).
107107
#
108-
# @param [JobPayload] payload
108+
# @param [ActiveJobPayload, VanillaResquePayload] payload
109109
# @param [Float] duration in seconds
110110
#
111111
def record_perform_duration(payload, duration)
@@ -134,13 +134,14 @@ def install_hooks
134134
##
135135
# Prepended onto Resque::Worker to capture both queue latency
136136
# (before super) and perform duration (after super) for every
137-
# job that goes through perform_with_fork. JobPayload is built
138-
# once per job and shared between the two recordings.
137+
# job that goes through perform_with_fork. The payload object is
138+
# built once per job via JobPayload.for and shared between the
139+
# two recordings.
139140
#
140141
module WorkerInstrumentation
141142
def perform_with_fork(job, &block)
142143
started_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
143-
payload = JobPayload.new(job)
144+
payload = JobPayload.for(job)
144145
JobMetrics.record_queue_latency(payload)
145146
super
146147
ensure

lib/bigcommerce/prometheus/integrations/resque/job_payload.rb

Lines changed: 31 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -15,79 +15,48 @@
1515
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
1616
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
1717
#
18-
require 'time'
19-
2018
module Bigcommerce
2119
module Prometheus
2220
module Integrations
2321
class Resque
2422
##
25-
# Extracts the fields per-job metrics need from a Resque::Job's
26-
# payload. Eagerly parses in #initialize and exposes plain
27-
# attr_readers — does not hold a reference to the Resque::Job after
28-
# construction.
23+
# Classifies a Resque::Job's payload and builds the matching
24+
# shape-specific payload object for per-job metrics.
2925
#
30-
# See JobMetrics's class-level docs for the ActiveJob-shaped payload
31-
# contract this class consumes.
26+
# A payload is ActiveJob-shaped when `args[0]` is a Hash carrying a
27+
# truthy 'job_class' — the shape
28+
# ActiveJob::QueueAdapters::ResqueAdapter::JobWrapper produces
29+
# natively, and which service-local wrappers can produce
30+
# deliberately. Everything else — vanilla Resque jobs with primitive
31+
# args, nil or non-Hash payloads, `args` not being an Array — is
32+
# treated as vanilla.
3233
#
33-
class JobPayload
34-
# @return [String] the user's actual job class name. For
35-
# ActiveJob-shaped payloads this is `args[0]['job_class']`.
36-
# Falls back to the top-level Resque `class` field for vanilla
37-
# Resque payloads. `'unknown'` if neither is available, or if
38-
# the payload is malformed (nil, non-Hash, etc.).
39-
attr_reader :job_class
40-
41-
# @return [Time, nil] the queue-latency anchor. Prefers
42-
# `scheduled_at` over `enqueued_at` (so retries-with-backoff
43-
# don't count the intentional wait). nil for non-ActiveJob-shaped
44-
# payloads (where `args[0]` isn't a Hash), for payloads where
45-
# both timestamps are absent or unparseable, and for malformed
46-
# payloads.
47-
attr_reader :anchor_time
48-
49-
# @param [Resque::Job] resque_job
50-
def initialize(resque_job)
51-
payload = resque_job.payload || {}
52-
inner = extract_activejob_inner(payload)
53-
54-
@job_class = extract_job_class(payload, inner)
55-
@anchor_time = extract_anchor_time(inner)
56-
end
57-
58-
private
59-
60-
# Returns the inner ActiveJob-shaped payload (`args[0]` if it's a
61-
# Hash), or nil for any payload shape that isn't ActiveJob-wrapped
62-
# (vanilla Resque jobs with primitive args, non-Hash payloads,
63-
# `args` not being an Array, etc.).
64-
def extract_activejob_inner(payload)
65-
return nil unless payload.is_a?(Hash)
66-
67-
args = payload['args']
68-
first = args.is_a?(Array) ? args.first : nil
69-
first.is_a?(Hash) ? first : nil
70-
end
71-
72-
def extract_job_class(payload, inner)
73-
inner_class = inner.is_a?(Hash) ? inner['job_class'] : nil
74-
outer_class = payload.is_a?(Hash) ? payload['class'] : nil
75-
inner_class || outer_class || 'unknown'
76-
end
34+
# Both payload classes expose the same interface: #job_class
35+
# (String) and #anchor_time (Time or nil).
36+
#
37+
module JobPayload
38+
class << self
39+
##
40+
# @param [Resque::Job] resque_job
41+
# @return [ActiveJobPayload, VanillaResquePayload]
42+
#
43+
def for(resque_job)
44+
payload = resque_job.payload
45+
payload = {} unless payload.is_a?(Hash)
7746

78-
def extract_anchor_time(inner)
79-
return nil unless inner.is_a?(Hash)
47+
inner = activejob_inner(payload)
48+
inner ? ActiveJobPayload.new(inner) : VanillaResquePayload.new(payload)
49+
end
8050

81-
parse_time(inner['scheduled_at']) || parse_time(inner['enqueued_at'])
82-
end
51+
private
8352

84-
def parse_time(value)
85-
return value if value.is_a?(Time)
86-
return nil if value.nil? || value.to_s.empty?
53+
def activejob_inner(payload)
54+
args = payload['args']
55+
first = args.is_a?(Array) ? args.first : nil
56+
return nil unless first.is_a?(Hash) && first['job_class']
8757

88-
Time.iso8601(value.to_s)
89-
rescue ArgumentError
90-
nil
58+
first
59+
end
9160
end
9261
end
9362
end
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
6+
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
7+
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
8+
# persons to whom the Software is furnished to do so, subject to the following conditions:
9+
#
10+
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
11+
# Software.
12+
#
13+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
14+
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
15+
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
16+
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
17+
#
18+
module Bigcommerce
19+
module Prometheus
20+
module Integrations
21+
class Resque
22+
##
23+
# Payload fields for a vanilla Resque job (`Resque.enqueue(MyJob,
24+
# ...)`) — the top-level 'class' field is the real job class and the
25+
# args are raw positional values. Vanilla payloads carry no enqueue
26+
# timestamps, so there is never a queue-latency anchor: queue_latency
27+
# no-ops for these jobs by construction. perform_duration is
28+
# unaffected.
29+
#
30+
class VanillaResquePayload
31+
# @return [String] the top-level Resque payload class, or
32+
# 'unknown' for malformed payloads
33+
attr_reader :job_class
34+
35+
# @param [Hash] payload the raw Resque payload (normalized to a
36+
# Hash by JobPayload.for)
37+
def initialize(payload)
38+
@job_class = payload['class'] || 'unknown'
39+
end
40+
41+
# @return [nil] vanilla Resque payloads have no enqueue timestamps
42+
def anchor_time
43+
nil
44+
end
45+
end
46+
end
47+
end
48+
end
49+
end
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright (c) 2019-present, BigCommerce Pty. Ltd. All rights reserved
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
6+
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
7+
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
8+
# persons to whom the Software is furnished to do so, subject to the following conditions:
9+
#
10+
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
11+
# Software.
12+
#
13+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
14+
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
15+
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
16+
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
17+
#
18+
require 'spec_helper'
19+
20+
describe Bigcommerce::Prometheus::Integrations::Resque::ActiveJobPayload do
21+
# Mirrors the inner hash ActiveJob's JobWrapper stamps into args[0].
22+
# JobPayload.for guarantees a truthy job_class before construction.
23+
def inner_payload(job_class: 'MyJob', enqueued_at: nil, scheduled_at: nil)
24+
{
25+
'job_class' => job_class,
26+
'arguments' => [],
27+
'enqueued_at' => enqueued_at,
28+
'scheduled_at' => scheduled_at
29+
}.compact
30+
end
31+
32+
describe '#job_class' do
33+
it 'returns the inner job_class' do
34+
expect(described_class.new(inner_payload(job_class: 'BigPay::SomePublishJob')).job_class)
35+
.to eq('BigPay::SomePublishJob')
36+
end
37+
end
38+
39+
describe '#anchor_time' do
40+
it 'prefers scheduled_at when both are present (excludes the intentional backoff)' do
41+
enqueued = (Time.now - 30).iso8601(6)
42+
scheduled = (Time.now - 1).iso8601(6)
43+
44+
anchor = described_class.new(inner_payload(enqueued_at: enqueued, scheduled_at: scheduled)).anchor_time
45+
46+
expect(anchor).to be_within(0.5).of(Time.now - 1)
47+
end
48+
49+
it 'falls back to enqueued_at when scheduled_at is absent' do
50+
enqueued = (Time.now - 3).iso8601(6)
51+
52+
anchor = described_class.new(inner_payload(enqueued_at: enqueued)).anchor_time
53+
54+
expect(anchor).to be_within(0.5).of(Time.now - 3)
55+
end
56+
57+
it 'returns nil when neither timestamp is present' do
58+
expect(described_class.new(inner_payload).anchor_time).to be_nil
59+
end
60+
61+
it 'handles a Time instance directly in the payload' do
62+
time = Time.now - 2
63+
64+
expect(described_class.new(inner_payload(enqueued_at: time)).anchor_time).to eq(time)
65+
end
66+
67+
it 'returns nil for an unparseable string (no exception)' do
68+
expect do
69+
@anchor = described_class.new(inner_payload(enqueued_at: 'not a real time')).anchor_time
70+
end.not_to raise_error
71+
expect(@anchor).to be_nil
72+
end
73+
74+
it 'returns nil for an empty string' do
75+
expect(described_class.new(inner_payload(enqueued_at: '')).anchor_time).to be_nil
76+
end
77+
end
78+
79+
describe 'field independence under partial failure' do
80+
it 'still returns job_class when the enqueued_at timestamp is unparseable' do
81+
result = described_class.new(inner_payload(job_class: 'BigPay::SomeJob', enqueued_at: 'not a real time'))
82+
83+
expect(result.job_class).to eq('BigPay::SomeJob')
84+
expect(result.anchor_time).to be_nil
85+
end
86+
end
87+
end

spec/bigcommerce/prometheus/integrations/resque/job_metrics_spec.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ def resque_job_double(payload)
5050
double('Resque::Job', payload: payload)
5151
end
5252

53-
# Build a real JobPayload from a payload hash. Both record_* methods take
54-
# a JobPayload instance (the WorkerInstrumentation prepend constructs it
55-
# once per job and shares it between the two recordings); JobPayload's own
56-
# parsing edge cases are covered in job_payload_spec.rb.
53+
# Build a real payload object from a payload hash. Both record_* methods
54+
# take the payload object the WorkerInstrumentation prepend builds once per
55+
# job via JobPayload.for and shares between the two recordings;
56+
# classification and parsing edge cases are covered in the payload specs.
5757
def payload_for(payload_hash)
58-
Bigcommerce::Prometheus::Integrations::Resque::JobPayload.new(resque_job_double(payload_hash))
58+
Bigcommerce::Prometheus::Integrations::Resque::JobPayload.for(resque_job_double(payload_hash))
5959
end
6060

6161
def active_job_payload(job_class: 'MyJob', enqueued_at: nil, scheduled_at: nil)

0 commit comments

Comments
 (0)