Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require_relative 'handlers/default'
require_relative 'handlers/enqueue'
require_relative 'handlers/perform'
require_relative 'handlers/step'

module OpenTelemetry
module Instrumentation
Expand All @@ -27,6 +28,7 @@ module Handlers
# - perform
# - retry_stopped
# - discard
# - step (Continuations, Rails 8.1+)
#
# Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span,
# while internal spans keep their ActiveSupport event name.
Expand All @@ -48,14 +50,16 @@ def subscribe
default_handler = Handlers::Default.new(parent_span_provider, mapper, config)
enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config)
perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config)
step_handler = Handlers::Step.new(parent_span_provider, mapper, config)

handlers_by_pattern = {
'enqueue' => enqueue_handler,
'enqueue_at' => enqueue_handler,
'enqueue_retry' => default_handler,
'perform' => perform_handler,
'retry_stopped' => default_handler,
'discard' => default_handler
'discard' => default_handler,
'step' => step_handler
}

@subscriptions = handlers_by_pattern.map do |key, handler|
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Instrumentation
module ActiveJob
module Handlers
# Handles step.active_job to generate child spans for continuable job steps
class Step < Default
# Overrides the `Default#start_span` method to create a child span
# for a continuable job step
#
# @param name [String] of the Event
# @param id [String] of the event
# @param payload [Hash] containing job run information
# @return [Hash] with the span and generated context tokens
def start_span(name, _id, payload)
job = payload.fetch(:job)
step = payload.fetch(:step)
step_name = step.name.to_s

attributes = @mapper.call(payload).merge(
'rails.active_job.step.name' => step_name,
'rails.active_job.step.state' => step.resumed? ? 'resumed' : 'started'
)

span_name = span_name(job, step_name)
span = tracer.start_span(span_name, attributes: attributes)
token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span))

{ span: span, ctx_token: token }
end

# Overrides the `Default#finish` method to record step-specific
# attributes before closing the span
#
# @param _name [String] of the Event (unused)
# @param _id [String] of the event (unused)
# @param payload [Hash] containing job run information
def finish(_name, _id, payload)
otel = payload.delete(:__otel)
span = otel&.fetch(:span)
token = otel&.fetch(:ctx_token)

step = payload.fetch(:step)
span&.set_attribute('rails.active_job.step.result', 'interrupted') if payload[:interrupted]
span&.set_attribute('rails.active_job.step.cursor', step.cursor.to_s) if step.cursor

# Continuation::Interrupt is control flow, not a real error — skip recording it
on_exception(payload[:error] || payload[:exception_object], span) unless payload[:interrupted]
rescue StandardError => e
OpenTelemetry.handle_error(exception: e)
ensure
finish_span(span, token)
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

require_relative '../../../../../lib/opentelemetry/instrumentation/active_job'

require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable)

describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::Step do
let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance }
let(:config) { { propagation_style: :link, span_naming: :queue } }
let(:exporter) { EXPORTER }
let(:spans) { exporter.finished_spans }
let(:process_span) { spans.find { |s| s.name == 'default process' } }
let(:step_spans) { spans.select { |s| s.name.end_with?('first_step', 'second_step', 'process_items') } }

before do
skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable)

OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe
instrumentation.instance_variable_set(:@config, config)
instrumentation.instance_variable_set(:@installed, false)

instrumentation.install(config)

exporter.reset
end

describe 'basic step tracing' do
it 'creates a span for each step' do
ContinuableJob.perform_now

_(step_spans.length).must_equal 2
end

it 'names spans using the step name' do
ContinuableJob.perform_now

first_step_span = spans.find { |s| s.name == 'default first_step' }
second_step_span = spans.find { |s| s.name == 'default second_step' }

_(first_step_span).wont_be_nil
_(second_step_span).wont_be_nil
end

it 'creates step spans as children of the process span' do
ContinuableJob.perform_now

step_spans.each do |step_span|
_(step_span.parent_span_id).must_equal process_span.span_id
end
end
end

describe 'step attributes' do
it 'includes the step name' do
ContinuableJob.perform_now

first_step_span = spans.find { |s| s.name == 'default first_step' }
_(first_step_span.attributes['rails.active_job.step.name']).must_equal 'first_step'
end

it 'includes state as started for first execution' do
ContinuableJob.perform_now

first_step_span = spans.find { |s| s.name == 'default first_step' }
_(first_step_span.attributes['rails.active_job.step.state']).must_equal 'started'
end

it 'includes standard messaging attributes' do
ContinuableJob.perform_now

first_step_span = spans.find { |s| s.name == 'default first_step' }
_(first_step_span.attributes['code.namespace']).must_equal 'ContinuableJob'
_(first_step_span.attributes['messaging.system']).must_equal 'active_job'
_(first_step_span.attributes['messaging.destination']).must_equal 'default'
end

it 'does not marks the step as interrupted' do
ContinuableJob.perform_now

first_step_span = spans.find { |s| s.name == 'default first_step' }
assert_nil(first_step_span.attributes['rails.active_job.step.result'])
end
end

describe 'cursor tracking' do
it 'includes the cursor value when present' do
ContinuableWithCursorJob.perform_now

step_span = spans.find { |s| s.name == 'default process_items' }
_(step_span.attributes['rails.active_job.step.cursor']).must_equal '3'
end
end

describe 'interrupted step' do
before do
singleton_class.include ActiveJob::Continuation::TestHelper
ActiveJob::Base.queue_adapter = :test
end

after do
ActiveJob::Base.queue_adapter = :inline
end

it 'does not record an error on the step span' do
ContinuableWithCursorJob.perform_later
interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs }

step_span = spans.find { |s| s.name == 'default process_items' }
_(step_span.status.code).must_equal OpenTelemetry::Trace::Status::OK
_(step_span.events).must_be_nil
end

it 'marks the step as interrupted' do
ContinuableWithCursorJob.perform_later
interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs }

step_span = spans.find { |s| s.name == 'default process_items' }
_(step_span.attributes['rails.active_job.step.result']).must_equal 'interrupted'
end
end

describe 'span_naming option' do
describe 'when job_class' do
let(:config) { { propagation_style: :link, span_naming: :job_class } }

it 'names step spans using the job class' do
ContinuableJob.perform_now

first_step_span = spans.find { |s| s.name == 'ContinuableJob first_step' }
_(first_step_span).wont_be_nil
end
end
end
end
33 changes: 33 additions & 0 deletions instrumentation/active_job/test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Bundler.require(:default, :development, :test)

require 'active_job'
require 'active_support/core_ext/object/with'
require 'opentelemetry-instrumentation-active_job'
require 'minitest/autorun'
require 'rspec/mocks/minitest_integration'
Expand Down Expand Up @@ -90,6 +91,38 @@ def perform
end
end

if defined?(ActiveJob::Continuable)
class ContinuableJob < ActiveJob::Base
include ActiveJob::Continuable

self.resume_options = { wait: 0 }

def perform
step :first_step do
# no-op
end

step :second_step do
# no-op
end
end
end

class ContinuableWithCursorJob < ActiveJob::Base
include ActiveJob::Continuable

self.resume_options = { wait: 0 }

def perform
step(:process_items, start: 0) do |step|
(step.cursor..2).each do |i|
step.set! i + 1
end
end
end
end
end

ActiveJob::Base.queue_adapter = :inline
ActiveJob::Base.logger = Logger.new($stderr, level: ENV.fetch('OTEL_LOG_LEVEL', 'fatal').to_sym)

Expand Down
Loading