From b7c39e01b1cda3b5d2065aa15cee6b34c449248a Mon Sep 17 00:00:00 2001 From: Bart de Water <118401830+bdewater-thatch@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:33:11 -0400 Subject: [PATCH 1/5] feat(active_job): add step.active_job span handler for Continuation Add tracing support for Rails 8.1 Active Job Continuation steps. Each step in a continuable job gets its own child span under the perform span, with attributes for step name, cursor, resumed, and interrupted state. --- .../instrumentation/active_job/handlers.rb | 5 +- .../active_job/handlers/step.rb | 63 ++++++++ .../active_job/handlers/step_test.rb | 140 ++++++++++++++++++ .../active_job/test/test_helper.rb | 33 +++++ 4 files changed, 240 insertions(+), 1 deletion(-) create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index 70bebf0617..cff9f19448 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -8,6 +8,7 @@ require_relative 'handlers/default' require_relative 'handlers/enqueue' require_relative 'handlers/perform' +require_relative 'handlers/step' module OpenTelemetry module Instrumentation @@ -48,6 +49,7 @@ def subscribe default_handler = Handlers::Default.new(parent_span_provider, mapper, config) enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config) perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config) + step_handler = Handlers::Step.new(parent_span_provider, mapper, config) handlers_by_pattern = { 'enqueue' => enqueue_handler, @@ -55,7 +57,8 @@ def subscribe 'enqueue_retry' => default_handler, 'perform' => perform_handler, 'retry_stopped' => default_handler, - 'discard' => default_handler + 'discard' => default_handler, + 'step' => step_handler } @subscriptions = handlers_by_pattern.map do |key, handler| diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb new file mode 100644 index 0000000000..21578d44f0 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Handlers + # Handles step.active_job to generate child spans for continuable job steps + class Step < Default + # Overrides the `Default#start_span` method to create a child span + # for a continuable job step + # + # @param name [String] of the Event + # @param id [String] of the event + # @param payload [Hash] containing job run information + # @return [Hash] with the span and generated context tokens + def start_span(name, _id, payload) + job = payload.fetch(:job) + step = payload.fetch(:step) + step_name = step.name.to_s + + attributes = @mapper.call(payload).merge( + 'messaging.active_job.step.name' => step_name, + 'messaging.active_job.step.resumed' => step.resumed? + ) + + span_name = span_name(job, step_name) + span = tracer.start_span(span_name, attributes: attributes) + token = OpenTelemetry::Context.attach(OpenTelemetry::Trace.context_with_span(span)) + + { span: span, ctx_token: token } + end + + # Overrides the `Default#finish` method to record step-specific + # attributes before closing the span + # + # @param _name [String] of the Event (unused) + # @param _id [String] of the event (unused) + # @param payload [Hash] containing job run information + def finish(_name, _id, payload) + otel = payload.delete(:__otel) + span = otel&.fetch(:span) + token = otel&.fetch(:ctx_token) + + step = payload.fetch(:step) + span&.set_attribute('messaging.active_job.step.interrupted', payload[:interrupted]) if payload.key?(:interrupted) + span&.set_attribute('messaging.active_job.step.cursor', step.cursor.to_s) if step.cursor + + # Continuation::Interrupt is control flow, not a real error — skip recording it + on_exception(payload[:error] || payload[:exception_object], span) unless payload[:interrupted] + rescue StandardError => e + OpenTelemetry.handle_error(exception: e) + ensure + finish_span(span, token) + end + end + end + end + end +end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb new file mode 100644 index 0000000000..79157bc847 --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb @@ -0,0 +1,140 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable) + +describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::Step do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:process_span) { spans.find { |s| s.name == 'default process' } } + let(:step_spans) { spans.select { |s| s.name.end_with?('first_step', 'second_step', 'process_items') } } + + before do + skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable) + + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + + exporter.reset + end + + describe 'basic step tracing' do + it 'creates a span for each step' do + ContinuableJob.perform_now + + _(step_spans.length).must_equal 2 + end + + it 'names spans using the step name' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + second_step_span = spans.find { |s| s.name == 'default second_step' } + + _(first_step_span).wont_be_nil + _(second_step_span).wont_be_nil + end + + it 'creates step spans as children of the process span' do + ContinuableJob.perform_now + + step_spans.each do |step_span| + _(step_span.parent_span_id).must_equal process_span.span_id + end + end + end + + describe 'step attributes' do + it 'includes the step name' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + _(first_step_span.attributes['messaging.active_job.step.name']).must_equal 'first_step' + end + + it 'includes resumed as false for first execution' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + _(first_step_span.attributes['messaging.active_job.step.resumed']).must_equal false + end + + it 'includes standard messaging attributes' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + _(first_step_span.attributes['code.namespace']).must_equal 'ContinuableJob' + _(first_step_span.attributes['messaging.system']).must_equal 'active_job' + _(first_step_span.attributes['messaging.destination']).must_equal 'default' + end + + it 'includes interrupted attribute as false when not interrupted' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'default first_step' } + _(first_step_span.attributes['messaging.active_job.step.interrupted']).must_equal false + end + end + + describe 'cursor tracking' do + it 'includes the cursor value when present' do + ContinuableWithCursorJob.perform_now + + step_span = spans.find { |s| s.name == 'default process_items' } + _(step_span.attributes['messaging.active_job.step.cursor']).must_equal '3' + end + end + + describe 'interrupted step' do + before do + singleton_class.include ActiveJob::Continuation::TestHelper + ActiveJob::Base.queue_adapter = :test + end + + after do + ActiveJob::Base.queue_adapter = :inline + end + + it 'does not record an error on the step span' do + ContinuableWithCursorJob.perform_later + interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs } + + step_span = spans.find { |s| s.name == 'default process_items' } + _(step_span.status.code).must_equal OpenTelemetry::Trace::Status::OK + _(step_span.events).must_be_nil + end + + it 'marks the step as interrupted' do + ContinuableWithCursorJob.perform_later + interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs } + + step_span = spans.find { |s| s.name == 'default process_items' } + _(step_span.attributes['messaging.active_job.step.interrupted']).must_equal true + end + end + + describe 'span_naming option' do + describe 'when job_class' do + let(:config) { { propagation_style: :link, span_naming: :job_class } } + + it 'names step spans using the job class' do + ContinuableJob.perform_now + + first_step_span = spans.find { |s| s.name == 'ContinuableJob first_step' } + _(first_step_span).wont_be_nil + end + end + end +end diff --git a/instrumentation/active_job/test/test_helper.rb b/instrumentation/active_job/test/test_helper.rb index 69a5826a9f..b21bde653d 100644 --- a/instrumentation/active_job/test/test_helper.rb +++ b/instrumentation/active_job/test/test_helper.rb @@ -10,6 +10,7 @@ Bundler.require(:default, :development, :test) require 'active_job' +require 'active_support/core_ext/object/with' require 'opentelemetry-instrumentation-active_job' require 'minitest/autorun' require 'rspec/mocks/minitest_integration' @@ -90,6 +91,38 @@ def perform end end +if defined?(ActiveJob::Continuable) + class ContinuableJob < ActiveJob::Base + include ActiveJob::Continuable + + self.resume_options = { wait: 0 } + + def perform + step :first_step do + # no-op + end + + step :second_step do + # no-op + end + end + end + + class ContinuableWithCursorJob < ActiveJob::Base + include ActiveJob::Continuable + + self.resume_options = { wait: 0 } + + def perform + step(:process_items, start: 0) do |step| + (step.cursor..2).each do |i| + step.set! i + 1 + end + end + end + end +end + ActiveJob::Base.queue_adapter = :inline ActiveJob::Base.logger = Logger.new($stderr, level: ENV.fetch('OTEL_LOG_LEVEL', 'fatal').to_sym) From 7995c4abaa34017ed481179e2a12b8cbfbe3fe5a Mon Sep 17 00:00:00 2001 From: Bart de Water <118401830+bdewater-thatch@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:33:30 -0400 Subject: [PATCH 2/5] feat(active_job): add resume.active_job span event for Continuation Record a span event on the perform (ingress) span when a continuable job resumes from a previous execution, with attributes for resumption count, description, and completed steps. --- .../instrumentation/active_job/handlers.rb | 5 +- .../active_job/handlers/resume.rb | 40 +++++++++ .../active_job/handlers/resume_test.rb | 85 +++++++++++++++++++ 3 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/resume.rb create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/resume_test.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index cff9f19448..0489f7f7e1 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -8,6 +8,7 @@ require_relative 'handlers/default' require_relative 'handlers/enqueue' require_relative 'handlers/perform' +require_relative 'handlers/resume' require_relative 'handlers/step' module OpenTelemetry @@ -50,6 +51,7 @@ def subscribe enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config) perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config) step_handler = Handlers::Step.new(parent_span_provider, mapper, config) + resume_handler = Handlers::Resume.new(parent_span_provider) handlers_by_pattern = { 'enqueue' => enqueue_handler, @@ -58,7 +60,8 @@ def subscribe 'perform' => perform_handler, 'retry_stopped' => default_handler, 'discard' => default_handler, - 'step' => step_handler + 'step' => step_handler, + 'resume' => resume_handler } @subscriptions = handlers_by_pattern.map do |key, handler| diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/resume.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/resume.rb new file mode 100644 index 0000000000..e6e973d757 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/resume.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Handlers + # Handles resume.active_job to record a span event on the ingress span + # when a continuable job resumes from a previous execution + class Resume + def initialize(parent_span_provider) + @parent_span_provider = parent_span_provider + end + + # @param _name [String] of the Event (unused) + # @param _id [String] of the event (unused) + # @param payload [Hash] containing job run information + def start(_name, _id, payload) + span = @parent_span_provider.current_span + return unless span.recording? + + job = payload.fetch(:job) + + attributes = { + 'messaging.active_job.continuation.resumptions' => job.resumptions, + 'messaging.active_job.continuation.description' => payload[:description] + } + + span.add_event('resume', attributes: attributes) + end + + def finish(_name, _id, _payload); end + end + end + end + end +end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/resume_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/resume_test.rb new file mode 100644 index 0000000000..7c45e30608 --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/resume_test.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable) + +describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::Resume do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:process_spans) { spans.select { |s| s.name == 'default process' } } + + before do + skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable) + + singleton_class.include ActiveJob::Continuation::TestHelper + + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + ActiveJob::Base.queue_adapter = :test + + exporter.reset + end + + after do + ActiveJob::Base.queue_adapter = :inline + end + + describe 'when a job resumes' do + it 'adds a resume event to the process span' do + ContinuableJob.perform_later + interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } + perform_enqueued_jobs + + resumed_process_span = process_spans.last + resume_events = resumed_process_span.events&.select { |e| e.name == 'resume' } || [] + + _(resume_events.length).must_equal 1 + end + + it 'includes resumptions count' do + ContinuableJob.perform_later + interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } + perform_enqueued_jobs + + resumed_process_span = process_spans.last + resume_event = resumed_process_span.events.find { |e| e.name == 'resume' } + + _(resume_event.attributes['messaging.active_job.continuation.resumptions']).must_equal 1 + end + + it 'includes description' do + ContinuableJob.perform_later + interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } + perform_enqueued_jobs + + resumed_process_span = process_spans.last + resume_event = resumed_process_span.events.find { |e| e.name == 'resume' } + + _(resume_event.attributes['messaging.active_job.continuation.description']).must_equal "after 'first_step'" + end + end + + describe 'when a job has not been interrupted' do + it 'does not add a resume event' do + ContinuableJob.perform_later + perform_enqueued_jobs + + process_span = process_spans.first + resume_events = process_span.events&.select { |e| e.name == 'resume' } || [] + + _(resume_events).must_be(:empty?) + end + end +end From be61b162745aa3b9b026afd06e2576d2e535cf9b Mon Sep 17 00:00:00 2001 From: Bart de Water <118401830+bdewater-thatch@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:33:47 -0400 Subject: [PATCH 3/5] feat(active_job): add step_skipped.active_job span event for Continuation Record a span event on the perform span when a previously completed step is skipped during a resumed continuable job execution, including the step name as an attribute. --- .../instrumentation/active_job/handlers.rb | 5 +- .../active_job/handlers/step_skipped.rb | 35 +++++++++ .../active_job/handlers/step_skipped_test.rb | 75 +++++++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step_skipped.rb create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_skipped_test.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index 0489f7f7e1..6e4a267558 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -10,6 +10,7 @@ require_relative 'handlers/perform' require_relative 'handlers/resume' require_relative 'handlers/step' +require_relative 'handlers/step_skipped' module OpenTelemetry module Instrumentation @@ -52,6 +53,7 @@ def subscribe perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config) step_handler = Handlers::Step.new(parent_span_provider, mapper, config) resume_handler = Handlers::Resume.new(parent_span_provider) + step_skipped_handler = Handlers::StepSkipped.new(parent_span_provider) handlers_by_pattern = { 'enqueue' => enqueue_handler, @@ -61,7 +63,8 @@ def subscribe 'retry_stopped' => default_handler, 'discard' => default_handler, 'step' => step_handler, - 'resume' => resume_handler + 'resume' => resume_handler, + 'step_skipped' => step_skipped_handler } @subscriptions = handlers_by_pattern.map do |key, handler| diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step_skipped.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step_skipped.rb new file mode 100644 index 0000000000..bf5e454ca1 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step_skipped.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Handlers + # Handles step_skipped.active_job to record a span event on the ingress span + # when a previously completed step is skipped during a resumed job execution + class StepSkipped + def initialize(parent_span_provider) + @parent_span_provider = parent_span_provider + end + + # @param _name [String] of the Event (unused) + # @param _id [String] of the event (unused) + # @param payload [Hash] containing job run information + def start(_name, _id, payload) + span = @parent_span_provider.current_span + return unless span.recording? + + span.add_event('step_skipped', attributes: { + 'messaging.active_job.step.name' => payload[:step].to_s + }) + end + + def finish(_name, _id, _payload); end + end + end + end + end +end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_skipped_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_skipped_test.rb new file mode 100644 index 0000000000..3e36d8896b --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_skipped_test.rb @@ -0,0 +1,75 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable) + +describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::StepSkipped do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + + before do + skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable) + + singleton_class.include ActiveJob::Continuation::TestHelper + + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + ActiveJob::Base.queue_adapter = :test + + exporter.reset + end + + after do + ActiveJob::Base.queue_adapter = :inline + end + + describe 'when a step is skipped on resume' do + it 'adds a step_skipped event to the process span' do + ContinuableJob.perform_later + interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } + exporter.reset + perform_enqueued_jobs + + process_span = spans.find { |s| s.name == 'default process' } + skipped_events = process_span.events&.select { |e| e.name == 'step_skipped' } || [] + + _(skipped_events.length).must_equal 1 + end + + it 'includes the step name' do + ContinuableJob.perform_later + interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } + exporter.reset + perform_enqueued_jobs + + process_span = spans.find { |s| s.name == 'default process' } + skipped_event = process_span.events.find { |e| e.name == 'step_skipped' } + + _(skipped_event.attributes['messaging.active_job.step.name']).must_equal 'first_step' + end + end + + describe 'when no steps are skipped' do + it 'does not add a step_skipped event' do + ContinuableJob.perform_later + perform_enqueued_jobs + + process_span = spans.find { |s| s.name == 'default process' } + skipped_events = process_span.events&.select { |e| e.name == 'step_skipped' } || [] + + _(skipped_events).must_be(:empty?) + end + end +end From d8fa1a605722aaa8ef40364fcc4a70c22773471a Mon Sep 17 00:00:00 2001 From: Bart de Water <118401830+bdewater-thatch@users.noreply.github.com> Date: Mon, 13 Apr 2026 11:33:52 -0400 Subject: [PATCH 4/5] feat(active_job): add interrupt.active_job span event for Continuation Record a span event on the current span when a continuable job is interrupted, with attributes for the interrupt reason, description, and completed steps. --- .../instrumentation/active_job/handlers.rb | 11 ++- .../active_job/handlers/interrupt.rb | 38 ++++++++ .../active_job/handlers/interrupt_test.rb | 91 +++++++++++++++++++ 3 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/interrupt.rb create mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/interrupt_test.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index 6e4a267558..1dc6637378 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -8,6 +8,7 @@ require_relative 'handlers/default' require_relative 'handlers/enqueue' require_relative 'handlers/perform' +require_relative 'handlers/interrupt' require_relative 'handlers/resume' require_relative 'handlers/step' require_relative 'handlers/step_skipped' @@ -30,6 +31,12 @@ module Handlers # - perform # - retry_stopped # - discard + # - step (Continuations, Rails 8.1+) + # + # The following events are recorded as span events: + # - resume (Continuations, Rails 8.1+) + # - step_skipped (Continuations, Rails 8.1+) + # - interrupt (Continuations, Rails 8.1+) # # Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span, # while internal spans keep their ActiveSupport event name. @@ -52,6 +59,7 @@ def subscribe enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config) perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config) step_handler = Handlers::Step.new(parent_span_provider, mapper, config) + interrupt_handler = Handlers::Interrupt.new(parent_span_provider) resume_handler = Handlers::Resume.new(parent_span_provider) step_skipped_handler = Handlers::StepSkipped.new(parent_span_provider) @@ -64,7 +72,8 @@ def subscribe 'discard' => default_handler, 'step' => step_handler, 'resume' => resume_handler, - 'step_skipped' => step_skipped_handler + 'step_skipped' => step_skipped_handler, + 'interrupt' => interrupt_handler } @subscriptions = handlers_by_pattern.map do |key, handler| diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/interrupt.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/interrupt.rb new file mode 100644 index 0000000000..f4b85388d0 --- /dev/null +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/interrupt.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +module OpenTelemetry + module Instrumentation + module ActiveJob + module Handlers + # Handles interrupt.active_job to record a span event when a continuable + # job is interrupted for resumption + class Interrupt + def initialize(parent_span_provider) + @parent_span_provider = parent_span_provider + end + + # @param _name [String] of the Event (unused) + # @param _id [String] of the event (unused) + # @param payload [Hash] containing job run information + def start(_name, _id, payload) + span = @parent_span_provider.current_span + return unless span.recording? + + attributes = { + 'messaging.active_job.continuation.reason' => payload[:reason].to_s, + 'messaging.active_job.continuation.description' => payload[:description] + } + + span.add_event('interrupt', attributes: attributes) + end + + def finish(_name, _id, _payload); end + end + end + end + end +end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/interrupt_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/interrupt_test.rb new file mode 100644 index 0000000000..7ab70122af --- /dev/null +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/interrupt_test.rb @@ -0,0 +1,91 @@ +# frozen_string_literal: true + +# Copyright The OpenTelemetry Authors +# +# SPDX-License-Identifier: Apache-2.0 + +require 'test_helper' + +require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' + +require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable) + +describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::Interrupt do + let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } + let(:config) { { propagation_style: :link, span_naming: :queue } } + let(:exporter) { EXPORTER } + let(:spans) { exporter.finished_spans } + let(:process_span) { spans.find { |s| s.name == 'default process' } } + + before do + skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable) + + singleton_class.include ActiveJob::Continuation::TestHelper + + OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe + instrumentation.instance_variable_set(:@config, config) + instrumentation.instance_variable_set(:@installed, false) + + instrumentation.install(config) + ActiveJob::Base.queue_adapter = :test + + exporter.reset + end + + after do + ActiveJob::Base.queue_adapter = :inline + end + + describe 'when a job is interrupted between steps' do + it 'adds an interrupt event to the process span' do + ContinuableJob.perform_later + interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } + + interrupt_events = process_span.events&.select { |e| e.name == 'interrupt' } || [] + + _(interrupt_events.length).must_equal 1 + end + + it 'includes the interrupt reason' do + ContinuableJob.perform_later + interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } + + interrupt_event = process_span.events.find { |e| e.name == 'interrupt' } + + _(interrupt_event.attributes['messaging.active_job.continuation.reason']).must_equal 'stopping' + end + + it 'includes the description' do + ContinuableJob.perform_later + interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } + + interrupt_event = process_span.events.find { |e| e.name == 'interrupt' } + + _(interrupt_event.attributes['messaging.active_job.continuation.description']).wont_be_nil + end + end + + describe 'when a job is interrupted during a step' do + it 'adds an interrupt event to the process span, not the step span' do + ContinuableWithCursorJob.perform_later + interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs } + + step_span = spans.find { |s| s.name == 'default process_items' } + step_interrupt_events = step_span&.events&.select { |e| e.name == 'interrupt' } || [] + process_interrupt_events = process_span.events&.select { |e| e.name == 'interrupt' } || [] + + _(step_interrupt_events).must_be(:empty?) + _(process_interrupt_events.length).must_equal 1 + end + end + + describe 'when a job completes without interruption' do + it 'does not add an interrupt event' do + ContinuableJob.perform_later + perform_enqueued_jobs + + interrupt_events = process_span.events&.select { |e| e.name == 'interrupt' } || [] + _(interrupt_events).must_be(:empty?) + end + end +end From f93761341a737c9a644fd1c6eb65cb284325ec55 Mon Sep 17 00:00:00 2001 From: Bart de Water <118401830+bdewater-thatch@users.noreply.github.com> Date: Fri, 22 May 2026 17:22:04 -0400 Subject: [PATCH 5/5] Address PR feedback --- .../instrumentation/active_job/handlers.rb | 16 +--- .../active_job/handlers/interrupt.rb | 38 -------- .../active_job/handlers/resume.rb | 40 -------- .../active_job/handlers/step.rb | 8 +- .../active_job/handlers/step_skipped.rb | 35 ------- .../active_job/handlers/interrupt_test.rb | 91 ------------------- .../active_job/handlers/resume_test.rb | 85 ----------------- .../active_job/handlers/step_skipped_test.rb | 75 --------------- .../active_job/handlers/step_test.rb | 14 +-- 9 files changed, 12 insertions(+), 390 deletions(-) delete mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/interrupt.rb delete mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/resume.rb delete mode 100644 instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step_skipped.rb delete mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/interrupt_test.rb delete mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/resume_test.rb delete mode 100644 instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_skipped_test.rb diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb index 1dc6637378..251f11a94b 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers.rb @@ -8,10 +8,7 @@ require_relative 'handlers/default' require_relative 'handlers/enqueue' require_relative 'handlers/perform' -require_relative 'handlers/interrupt' -require_relative 'handlers/resume' require_relative 'handlers/step' -require_relative 'handlers/step_skipped' module OpenTelemetry module Instrumentation @@ -33,11 +30,6 @@ module Handlers # - discard # - step (Continuations, Rails 8.1+) # - # The following events are recorded as span events: - # - resume (Continuations, Rails 8.1+) - # - step_skipped (Continuations, Rails 8.1+) - # - interrupt (Continuations, Rails 8.1+) - # # Ingress and Egress spans (perform, enqueue, enqueue_at) use Messaging semantic conventions for naming the span, # while internal spans keep their ActiveSupport event name. # @@ -59,9 +51,6 @@ def subscribe enqueue_handler = Handlers::Enqueue.new(parent_span_provider, mapper, config) perform_handler = Handlers::Perform.new(parent_span_provider, mapper, config) step_handler = Handlers::Step.new(parent_span_provider, mapper, config) - interrupt_handler = Handlers::Interrupt.new(parent_span_provider) - resume_handler = Handlers::Resume.new(parent_span_provider) - step_skipped_handler = Handlers::StepSkipped.new(parent_span_provider) handlers_by_pattern = { 'enqueue' => enqueue_handler, @@ -70,10 +59,7 @@ def subscribe 'perform' => perform_handler, 'retry_stopped' => default_handler, 'discard' => default_handler, - 'step' => step_handler, - 'resume' => resume_handler, - 'step_skipped' => step_skipped_handler, - 'interrupt' => interrupt_handler + 'step' => step_handler } @subscriptions = handlers_by_pattern.map do |key, handler| diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/interrupt.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/interrupt.rb deleted file mode 100644 index f4b85388d0..0000000000 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/interrupt.rb +++ /dev/null @@ -1,38 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -module OpenTelemetry - module Instrumentation - module ActiveJob - module Handlers - # Handles interrupt.active_job to record a span event when a continuable - # job is interrupted for resumption - class Interrupt - def initialize(parent_span_provider) - @parent_span_provider = parent_span_provider - end - - # @param _name [String] of the Event (unused) - # @param _id [String] of the event (unused) - # @param payload [Hash] containing job run information - def start(_name, _id, payload) - span = @parent_span_provider.current_span - return unless span.recording? - - attributes = { - 'messaging.active_job.continuation.reason' => payload[:reason].to_s, - 'messaging.active_job.continuation.description' => payload[:description] - } - - span.add_event('interrupt', attributes: attributes) - end - - def finish(_name, _id, _payload); end - end - end - end - end -end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/resume.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/resume.rb deleted file mode 100644 index e6e973d757..0000000000 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/resume.rb +++ /dev/null @@ -1,40 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -module OpenTelemetry - module Instrumentation - module ActiveJob - module Handlers - # Handles resume.active_job to record a span event on the ingress span - # when a continuable job resumes from a previous execution - class Resume - def initialize(parent_span_provider) - @parent_span_provider = parent_span_provider - end - - # @param _name [String] of the Event (unused) - # @param _id [String] of the event (unused) - # @param payload [Hash] containing job run information - def start(_name, _id, payload) - span = @parent_span_provider.current_span - return unless span.recording? - - job = payload.fetch(:job) - - attributes = { - 'messaging.active_job.continuation.resumptions' => job.resumptions, - 'messaging.active_job.continuation.description' => payload[:description] - } - - span.add_event('resume', attributes: attributes) - end - - def finish(_name, _id, _payload); end - end - end - end - end -end diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb index 21578d44f0..d1d3ead0c2 100644 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb +++ b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step.rb @@ -23,8 +23,8 @@ def start_span(name, _id, payload) step_name = step.name.to_s attributes = @mapper.call(payload).merge( - 'messaging.active_job.step.name' => step_name, - 'messaging.active_job.step.resumed' => step.resumed? + 'rails.active_job.step.name' => step_name, + 'rails.active_job.step.state' => step.resumed? ? 'resumed' : 'started' ) span_name = span_name(job, step_name) @@ -46,8 +46,8 @@ def finish(_name, _id, payload) token = otel&.fetch(:ctx_token) step = payload.fetch(:step) - span&.set_attribute('messaging.active_job.step.interrupted', payload[:interrupted]) if payload.key?(:interrupted) - span&.set_attribute('messaging.active_job.step.cursor', step.cursor.to_s) if step.cursor + span&.set_attribute('rails.active_job.step.result', 'interrupted') if payload[:interrupted] + span&.set_attribute('rails.active_job.step.cursor', step.cursor.to_s) if step.cursor # Continuation::Interrupt is control flow, not a real error — skip recording it on_exception(payload[:error] || payload[:exception_object], span) unless payload[:interrupted] diff --git a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step_skipped.rb b/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step_skipped.rb deleted file mode 100644 index bf5e454ca1..0000000000 --- a/instrumentation/active_job/lib/opentelemetry/instrumentation/active_job/handlers/step_skipped.rb +++ /dev/null @@ -1,35 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -module OpenTelemetry - module Instrumentation - module ActiveJob - module Handlers - # Handles step_skipped.active_job to record a span event on the ingress span - # when a previously completed step is skipped during a resumed job execution - class StepSkipped - def initialize(parent_span_provider) - @parent_span_provider = parent_span_provider - end - - # @param _name [String] of the Event (unused) - # @param _id [String] of the event (unused) - # @param payload [Hash] containing job run information - def start(_name, _id, payload) - span = @parent_span_provider.current_span - return unless span.recording? - - span.add_event('step_skipped', attributes: { - 'messaging.active_job.step.name' => payload[:step].to_s - }) - end - - def finish(_name, _id, _payload); end - end - end - end - end -end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/interrupt_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/interrupt_test.rb deleted file mode 100644 index 7ab70122af..0000000000 --- a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/interrupt_test.rb +++ /dev/null @@ -1,91 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -require 'test_helper' - -require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' - -require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable) - -describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::Interrupt do - let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } - let(:config) { { propagation_style: :link, span_naming: :queue } } - let(:exporter) { EXPORTER } - let(:spans) { exporter.finished_spans } - let(:process_span) { spans.find { |s| s.name == 'default process' } } - - before do - skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable) - - singleton_class.include ActiveJob::Continuation::TestHelper - - OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe - instrumentation.instance_variable_set(:@config, config) - instrumentation.instance_variable_set(:@installed, false) - - instrumentation.install(config) - ActiveJob::Base.queue_adapter = :test - - exporter.reset - end - - after do - ActiveJob::Base.queue_adapter = :inline - end - - describe 'when a job is interrupted between steps' do - it 'adds an interrupt event to the process span' do - ContinuableJob.perform_later - interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } - - interrupt_events = process_span.events&.select { |e| e.name == 'interrupt' } || [] - - _(interrupt_events.length).must_equal 1 - end - - it 'includes the interrupt reason' do - ContinuableJob.perform_later - interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } - - interrupt_event = process_span.events.find { |e| e.name == 'interrupt' } - - _(interrupt_event.attributes['messaging.active_job.continuation.reason']).must_equal 'stopping' - end - - it 'includes the description' do - ContinuableJob.perform_later - interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } - - interrupt_event = process_span.events.find { |e| e.name == 'interrupt' } - - _(interrupt_event.attributes['messaging.active_job.continuation.description']).wont_be_nil - end - end - - describe 'when a job is interrupted during a step' do - it 'adds an interrupt event to the process span, not the step span' do - ContinuableWithCursorJob.perform_later - interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs } - - step_span = spans.find { |s| s.name == 'default process_items' } - step_interrupt_events = step_span&.events&.select { |e| e.name == 'interrupt' } || [] - process_interrupt_events = process_span.events&.select { |e| e.name == 'interrupt' } || [] - - _(step_interrupt_events).must_be(:empty?) - _(process_interrupt_events.length).must_equal 1 - end - end - - describe 'when a job completes without interruption' do - it 'does not add an interrupt event' do - ContinuableJob.perform_later - perform_enqueued_jobs - - interrupt_events = process_span.events&.select { |e| e.name == 'interrupt' } || [] - _(interrupt_events).must_be(:empty?) - end - end -end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/resume_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/resume_test.rb deleted file mode 100644 index 7c45e30608..0000000000 --- a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/resume_test.rb +++ /dev/null @@ -1,85 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -require 'test_helper' - -require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' - -require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable) - -describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::Resume do - let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } - let(:config) { { propagation_style: :link, span_naming: :queue } } - let(:exporter) { EXPORTER } - let(:spans) { exporter.finished_spans } - let(:process_spans) { spans.select { |s| s.name == 'default process' } } - - before do - skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable) - - singleton_class.include ActiveJob::Continuation::TestHelper - - OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe - instrumentation.instance_variable_set(:@config, config) - instrumentation.instance_variable_set(:@installed, false) - - instrumentation.install(config) - ActiveJob::Base.queue_adapter = :test - - exporter.reset - end - - after do - ActiveJob::Base.queue_adapter = :inline - end - - describe 'when a job resumes' do - it 'adds a resume event to the process span' do - ContinuableJob.perform_later - interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } - perform_enqueued_jobs - - resumed_process_span = process_spans.last - resume_events = resumed_process_span.events&.select { |e| e.name == 'resume' } || [] - - _(resume_events.length).must_equal 1 - end - - it 'includes resumptions count' do - ContinuableJob.perform_later - interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } - perform_enqueued_jobs - - resumed_process_span = process_spans.last - resume_event = resumed_process_span.events.find { |e| e.name == 'resume' } - - _(resume_event.attributes['messaging.active_job.continuation.resumptions']).must_equal 1 - end - - it 'includes description' do - ContinuableJob.perform_later - interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } - perform_enqueued_jobs - - resumed_process_span = process_spans.last - resume_event = resumed_process_span.events.find { |e| e.name == 'resume' } - - _(resume_event.attributes['messaging.active_job.continuation.description']).must_equal "after 'first_step'" - end - end - - describe 'when a job has not been interrupted' do - it 'does not add a resume event' do - ContinuableJob.perform_later - perform_enqueued_jobs - - process_span = process_spans.first - resume_events = process_span.events&.select { |e| e.name == 'resume' } || [] - - _(resume_events).must_be(:empty?) - end - end -end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_skipped_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_skipped_test.rb deleted file mode 100644 index 3e36d8896b..0000000000 --- a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_skipped_test.rb +++ /dev/null @@ -1,75 +0,0 @@ -# frozen_string_literal: true - -# Copyright The OpenTelemetry Authors -# -# SPDX-License-Identifier: Apache-2.0 - -require 'test_helper' - -require_relative '../../../../../lib/opentelemetry/instrumentation/active_job' - -require 'active_job/continuation/test_helper' if defined?(ActiveJob::Continuable) - -describe OpenTelemetry::Instrumentation::ActiveJob::Handlers::StepSkipped do - let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveJob::Instrumentation.instance } - let(:config) { { propagation_style: :link, span_naming: :queue } } - let(:exporter) { EXPORTER } - let(:spans) { exporter.finished_spans } - - before do - skip 'Requires ActiveJob::Continuable (Rails 8.1+)' unless defined?(ActiveJob::Continuable) - - singleton_class.include ActiveJob::Continuation::TestHelper - - OpenTelemetry::Instrumentation::ActiveJob::Handlers.unsubscribe - instrumentation.instance_variable_set(:@config, config) - instrumentation.instance_variable_set(:@installed, false) - - instrumentation.install(config) - ActiveJob::Base.queue_adapter = :test - - exporter.reset - end - - after do - ActiveJob::Base.queue_adapter = :inline - end - - describe 'when a step is skipped on resume' do - it 'adds a step_skipped event to the process span' do - ContinuableJob.perform_later - interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } - exporter.reset - perform_enqueued_jobs - - process_span = spans.find { |s| s.name == 'default process' } - skipped_events = process_span.events&.select { |e| e.name == 'step_skipped' } || [] - - _(skipped_events.length).must_equal 1 - end - - it 'includes the step name' do - ContinuableJob.perform_later - interrupt_job_after_step(ContinuableJob, :first_step) { perform_enqueued_jobs } - exporter.reset - perform_enqueued_jobs - - process_span = spans.find { |s| s.name == 'default process' } - skipped_event = process_span.events.find { |e| e.name == 'step_skipped' } - - _(skipped_event.attributes['messaging.active_job.step.name']).must_equal 'first_step' - end - end - - describe 'when no steps are skipped' do - it 'does not add a step_skipped event' do - ContinuableJob.perform_later - perform_enqueued_jobs - - process_span = spans.find { |s| s.name == 'default process' } - skipped_events = process_span.events&.select { |e| e.name == 'step_skipped' } || [] - - _(skipped_events).must_be(:empty?) - end - end -end diff --git a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb index 79157bc847..c03ae3f488 100644 --- a/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb +++ b/instrumentation/active_job/test/opentelemetry/instrumentation/active_job/handlers/step_test.rb @@ -61,14 +61,14 @@ ContinuableJob.perform_now first_step_span = spans.find { |s| s.name == 'default first_step' } - _(first_step_span.attributes['messaging.active_job.step.name']).must_equal 'first_step' + _(first_step_span.attributes['rails.active_job.step.name']).must_equal 'first_step' end - it 'includes resumed as false for first execution' do + it 'includes state as started for first execution' do ContinuableJob.perform_now first_step_span = spans.find { |s| s.name == 'default first_step' } - _(first_step_span.attributes['messaging.active_job.step.resumed']).must_equal false + _(first_step_span.attributes['rails.active_job.step.state']).must_equal 'started' end it 'includes standard messaging attributes' do @@ -80,11 +80,11 @@ _(first_step_span.attributes['messaging.destination']).must_equal 'default' end - it 'includes interrupted attribute as false when not interrupted' do + it 'does not marks the step as interrupted' do ContinuableJob.perform_now first_step_span = spans.find { |s| s.name == 'default first_step' } - _(first_step_span.attributes['messaging.active_job.step.interrupted']).must_equal false + assert_nil(first_step_span.attributes['rails.active_job.step.result']) end end @@ -93,7 +93,7 @@ ContinuableWithCursorJob.perform_now step_span = spans.find { |s| s.name == 'default process_items' } - _(step_span.attributes['messaging.active_job.step.cursor']).must_equal '3' + _(step_span.attributes['rails.active_job.step.cursor']).must_equal '3' end end @@ -121,7 +121,7 @@ interrupt_job_during_step(ContinuableWithCursorJob, :process_items, cursor: 2) { perform_enqueued_jobs } step_span = spans.find { |s| s.name == 'default process_items' } - _(step_span.attributes['messaging.active_job.step.interrupted']).must_equal true + _(step_span.attributes['rails.active_job.step.result']).must_equal 'interrupted' end end