diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index 70bebf0617..251f11a94b 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -8,6 +8,7 @@ require_relative 'handlers/default' require_relative 'handlers/enqueue' require_relative 'handlers/perform' +require_relative 'handlers/step' module OpenTelemetry module Instrumentation @@ -27,6 +28,7 @@ module Handlers # - perform # - retry_stopped # - discard + # - step (Continuations, Rails 8.1+) # # Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span, # while internal spans keep their ActiveSupport event name. @@ -48,6 +50,7 @@ def subscribe default_handler = Handlers::Default.new(parent_span_provider, mapper, config) enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config) perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config) + step_handler = Handlers::Step.new(parent_span_provider, mapper, config) handlers_by_pattern = { 'enqueue' => enqueue_handler, @@ -55,7 +58,8 @@ def subscribe 'enqueue_retry' => default_handler, 'perform' => perform_handler, 'retry_stopped' => default_handler, - 'discard' => default_handler + 'discard' => default_handler, + 'step' => step_handler } @subscriptions = handlers_by_pattern.map do |key, handler| diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb new file mode 100644 index 0000000000..d1d3ead0c2 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Handlers + # Handles step.active_job to generate child spans for continuable job steps + class Step < Default + # Overrides the `Default#start_span` method to create a child span + # for a continuable job step + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] with the span and generated context tokens + def start_span(name, _id, payload) + job = payload.fetch(:job) + step = payload.fetch(:step) + step_name = step.name.to_s + + attributes = @mapper.call(payload).merge( + 'rails.active_job.step.name' => step_name, + 'rails.active_job.step.state' => step.resumed? ? 'resumed' : 'started' + ) + + span_name = span_name(job, step_name) + span = tracer.start_span(span_name, attributes: attributes) + token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) + + { span: span, ctx_token: token } + end + + # Overrides the `Default#finish` method to record step-specific + # attributes before closing the span + # + # @param _name [String] of the Event (unused) + # @param _id [String] of the event (unused) + # @param payload [Hash] containing job run information + def finish(_name, _id, payload) + otel = payload.delete(:__otel) + span = otel&.fetch(:span) + token = otel&.fetch(:ctx_token) + + step = payload.fetch(:step) + span&.set_attribute('rails.active_job.step.result', 'interrupted') if payload[:interrupted] + span&.set_attribute('rails.active_job.step.cursor', step.cursor.to_s) if step.cursor + + # Continuation::Interrupt is control flow, not a real error — skip recording it + on_exception(payload[:error] || payload[:exception_object], span) unless payload[:interrupted] + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + ensure + finish_span(span, token) + end + end + end + end + end +end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb new file mode 100644 index 0000000000..c03ae3f488 --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable) + +describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::Step do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:process_span) { spans.find { |s| s.name == 'default process' } } + let(:step_spans) { spans.select { |s| s.name.end_with?('first_step', 'second_step', 'process_items') } } + + before do + skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable) + + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + + exporter.reset + end + + describe 'basic step tracing' do + it 'creates a span for each step' do + ContinuableJob.perform_now + + _(step_spans.length).must_equal 2 + end + + it 'names spans using the step name' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + second_step_span = spans.find { |s| s.name == 'default second_step' } + + _(first_step_span).wont_be_nil + _(second_step_span).wont_be_nil + end + + it 'creates step spans as children of the process span' do + ContinuableJob.perform_now + + step_spans.each do |step_span| + _(step_span.parent_span_id).must_equal process_span.span_id + end + end + end + + describe 'step attributes' do + it 'includes the step name' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + _(first_step_span.attributes['rails.active_job.step.name']).must_equal 'first_step' + end + + it 'includes state as started for first execution' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + _(first_step_span.attributes['rails.active_job.step.state']).must_equal 'started' + end + + it 'includes standard messaging attributes' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + _(first_step_span.attributes['code.namespace']).must_equal 'ContinuableJob' + _(first_step_span.attributes['messaging.system']).must_equal 'active_job' + _(first_step_span.attributes['messaging.destination']).must_equal 'default' + end + + it 'does not marks the step as interrupted' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + assert_nil(first_step_span.attributes['rails.active_job.step.result']) + end + end + + describe 'cursor tracking' do + it 'includes the cursor value when present' do + ContinuableWithCursorJob.perform_now + + step_span = spans.find { |s| s.name == 'default process_items' } + _(step_span.attributes['rails.active_job.step.cursor']).must_equal '3' + end + end + + describe 'interrupted step' do + before do + singleton_class.include ActiveJob::Continuation::TestHelper + ActiveJob::Base.queue_adapter = :test + end + + after do + ActiveJob::Base.queue_adapter = :inline + end + + it 'does not record an error on the step span' do + ContinuableWithCursorJob.perform_later + interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs } + + step_span = spans.find { |s| s.name == 'default process_items' } + _(step_span.status.code).must_equal OpenTelemetry::Trace::Status::OK + _(step_span.events).must_be_nil + end + + it 'marks the step as interrupted' do + ContinuableWithCursorJob.perform_later + interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs } + + step_span = spans.find { |s| s.name == 'default process_items' } + _(step_span.attributes['rails.active_job.step.result']).must_equal 'interrupted' + end + end + + describe 'span_naming option' do + describe 'when job_class' do + let(:config) { { propagation_style: :link, span_naming: :job_class } } + + it 'names step spans using the job class' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'ContinuableJob first_step' } + _(first_step_span).wont_be_nil + end + end + end +end diff --git a/instrumentation/active_job/test/test_helper.rb b/instrumentation/active_job/test/test_helper.rb index 69a5826a9f..b21bde653d 100644 --- a/instrumentation/active_job/test/test_helper.rb +++ b/instrumentation/active_job/test/test_helper.rb @@ -10,6 +10,7 @@ Bundler.require(:default, :development, :test) require 'active_job' +require 'active_support/core_ext/object/with' require 'opentelemetry-instrumentation-active_job' require 'minitest/autorun' require 'rspec/mocks/minitest_integration' @@ -90,6 +91,38 @@ def perform end end +if defined?(ActiveJob::Continuable) + class ContinuableJob < ActiveJob::Base + include ActiveJob::Continuable + + self.resume_options = { wait: 0 } + + def perform + step :first_step do + # no-op + end + + step :second_step do + # no-op + end + end + end + + class ContinuableWithCursorJob < ActiveJob::Base + include ActiveJob::Continuable + + self.resume_options = { wait: 0 } + + def perform + step(:process_items, start: 0) do |step| + (step.cursor..2).each do |i| + step.set! i + 1 + end + end + end + end +end + ActiveJob::Base.queue_adapter = :inline ActiveJob::Base.logger = Logger.new($stderr, level: ENV.fetch('OTEL_LOG_LEVEL', 'fatal').to_sym)