Skip to content

Commit df047e3

Browse files
committed
fix: replace N+1 lookup with single join query in DelayedJobsRecover
The previous implementation queried dead delayed_jobs then performed separate lookups per row to find the pollable job, entity, and last operation state. Replace with a single 4-table join across service_instance_operations, service_instances, jobs, and delayed_jobs, filtering all conditions in one query
1 parent 0ec67e4 commit df047e3

4 files changed

Lines changed: 332 additions & 37 deletions

File tree

app/jobs/runtime/delayed_jobs_recover.rb

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,6 @@ module VCAP::CloudController
22
module Jobs
33
module Runtime
44
class DelayedJobsRecover < VCAP::CloudController::Jobs::CCJob
5-
RECOVERABLE_OPERATIONS = %w[
6-
service_instance.create
7-
].freeze
8-
95
def perform
106
logger.info('Recover halted delayed jobs')
117
recover
@@ -18,48 +14,52 @@ def max_attempts
1814
private
1915

2016
def recover
21-
# find delayed jobs where failed_at is set (permanently failed)
22-
# and still within the max polling duration (not expired)
17+
# Find stuck service instance create operations where the broker is still working
18+
# but CC's polling job has permanently failed due to a transient error (e.g. brief db connection flip).
19+
# Join path: service_instance_operations → service_instances → jobs → delayed_jobs.
20+
#
21+
# Filters:
22+
# - service_instance_operations.state='in progress': the broker has not yet reported a final state
23+
# (succeeded or failed) that CC could successfully persist; if CC had received and saved a final
24+
# state from the broker, this column would already be 'succeeded' or 'failed' — not 'in progress'
25+
# - service_instance_operations.type='create': scope to create operations only
26+
# - service_instance_operations.created_at > cutoff: operations beyond the max async polling window
27+
# are intentionally excluded — the broker has given up on them too, so re-enqueuing is pointless
28+
# - jobs.state IN (POLLING, FAILED): the pollable job has not reached a terminal success state;
29+
# POLLING covers the case where the failure hook itself couldn't write FAILED due to the DB flip
30+
# - jobs.operation='service_instance.create': prevents matching update/delete jobs for the same
31+
# service instance that happen to share the same resource_guid
32+
# - delayed_jobs.failed_at IS NOT NULL: the delayed job permanently failed (exhausted max_attempts);
33+
# jobs still alive or locked have failed_at=NULL and must not be touched
2334
cutoff_time = Time.now - default_maximum_duration_seconds
24-
dead_delayed_jobs = Delayed::Job.
25-
exclude(failed_at: nil).
26-
where { created_at > cutoff_time }.
27-
order(:created_at).
28-
limit(batch_size)
29-
30-
dead_delayed_jobs.each do |delayed|
31-
# pollable job state can be POLLING or FAILED depending on whether the failure
32-
# hook managed to persist before the db connection was lost
33-
pollable = PollableJobModel.where(delayed_job_guid: delayed.guid).
34-
where(state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]).
35-
first
36-
next unless pollable
37-
next unless RECOVERABLE_OPERATIONS.include?(pollable.operation)
38-
39-
# last_operation.state must be 'in progress'. This confirms the broker is still
40-
# working on the operation and CC is the one that gave up, not the broker
41-
entity = find_entity(pollable)
42-
next unless entity
43-
next unless entity.last_operation&.state == 'in progress'
35+
stuck = ServiceInstanceOperation.
36+
join(:service_instances, id: Sequel[:service_instance_operations][:service_instance_id]).
37+
join(:jobs, resource_guid: Sequel[:service_instances][:guid]).
38+
join(:delayed_jobs, guid: Sequel[:jobs][:delayed_job_guid]).
39+
where(Sequel[:service_instance_operations][:state] => 'in progress').
40+
where(Sequel[:service_instance_operations][:type] => 'create').
41+
where { Sequel[:service_instance_operations][:created_at] > cutoff_time }.
42+
where(Sequel[:jobs][:state] => [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]).
43+
where(Sequel[:jobs][:operation] => 'service_instance.create').
44+
exclude(Sequel[:delayed_jobs][:failed_at] => nil).
45+
select(Sequel[:jobs][:guid].as(:pollable_guid), Sequel[:delayed_jobs][:guid].as(:dj_guid)).
46+
order(Sequel[:service_instance_operations][:created_at]).
47+
limit(batch_size)
4448

45-
reenqueue(pollable, delayed)
46-
end
47-
end
49+
stuck.each do |row|
50+
delayed = Delayed::Job.first(guid: row[:dj_guid])
51+
next unless delayed
4852

49-
def find_entity(pollable)
50-
# TODO: resource_type field can be used
51-
case pollable.operation
52-
when 'service_instance.create'
53-
ManagedServiceInstance.first(guid: pollable.resource_guid)
53+
reenqueue(row[:pollable_guid], delayed)
5454
end
5555
end
5656

57-
def reenqueue(pollable, delayed)
57+
def reenqueue(pollable_guid, delayed)
5858
# re-verify atomically that the pollable job still points to this dead delayed_job.
5959
# if another process already re-enqueued a new job, pollable.delayed_job_guid was
6060
# updated to the new delayed_job's guid, so where clause returns nil and we skip safely.
6161
PollableJobModel.db.transaction do
62-
pjob = PollableJobModel.where(guid: pollable.guid,
62+
pjob = PollableJobModel.where(guid: pollable_guid,
6363
delayed_job_guid: delayed.guid,
6464
state: [PollableJobModel::POLLING_STATE, PollableJobModel::FAILED_STATE]).
6565
for_update.first
@@ -68,7 +68,7 @@ def reenqueue(pollable, delayed)
6868
# bring the pollable job into the clean polling state
6969
pjob.update(cf_api_error: nil, state: PollableJobModel::POLLING_STATE)
7070

71-
# unwrap the serialized handler and re-enqueue via the reoccurring job
71+
# unwrap the serialized handler and re-enqueue via the reoccurring job's enqueue_next_job method
7272
inner_job = Jobs::Enqueuer.unwrap_job(delayed.payload_object)
7373
inner_job.send(:enqueue_next_job, pjob)
7474
end
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
Sequel.migration do
2+
no_transaction # required for concurrently option on postgres
3+
4+
up do
5+
if database_type == :postgres
6+
VCAP::Migration.with_concurrent_timeout(self) do
7+
add_index :jobs, %i[operation state],
8+
name: :jobs_operation_state_index,
9+
where: "state IN ('POLLING', 'FAILED')",
10+
if_not_exists: true,
11+
concurrently: true
12+
end
13+
elsif database_type == :mysql
14+
alter_table(:jobs) do
15+
# rubocop:disable Sequel/ConcurrentIndex -- MySQL does not support concurrent index operations
16+
add_index %i[operation state], name: :jobs_operation_state_index unless @db.indexes(:jobs).key?(:jobs_operation_state_index)
17+
# rubocop:enable Sequel/ConcurrentIndex
18+
end
19+
end
20+
end
21+
22+
down do
23+
if database_type == :postgres
24+
VCAP::Migration.with_concurrent_timeout(self) do
25+
drop_index :jobs, %i[operation state],
26+
name: :jobs_operation_state_index,
27+
if_exists: true,
28+
concurrently: true
29+
end
30+
elsif database_type == :mysql
31+
alter_table(:jobs) do
32+
# rubocop:disable Sequel/ConcurrentIndex
33+
drop_index %i[operation state], name: :jobs_operation_state_index if @db.indexes(:jobs).key?(:jobs_operation_state_index)
34+
# rubocop:enable Sequel/ConcurrentIndex
35+
end
36+
end
37+
end
38+
end
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# rubocop:disable Migration/TooManyMigrationRuns
2+
require 'spec_helper'
3+
require 'migrations/helpers/migration_shared_context'
4+
5+
def operation_state_partial_index_present
6+
# partial indexes are not returned in `db.indexes`. That's why we have to query this information manually.
7+
partial_indexes = db.fetch("SELECT * FROM pg_indexes WHERE tablename = 'jobs' AND indexname = 'jobs_operation_state_index';")
8+
9+
index_present = false
10+
partial_indexes.each do |_index|
11+
index_present = true
12+
end
13+
14+
index_present
15+
end
16+
17+
RSpec.describe 'migration to add operation_state_index on jobs table', isolation: :truncation, type: :migration do
18+
include_context 'migration' do
19+
let(:migration_filename) { '20260505071445_add_jobs_operation_state_index.rb' }
20+
end
21+
22+
describe 'jobs table' do
23+
it 'adds index and handles idempotency gracefully' do
24+
if db.database_type == :postgres
25+
# Test up migration
26+
expect(operation_state_partial_index_present).to be_falsey
27+
expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error
28+
expect(operation_state_partial_index_present).to be_truthy
29+
30+
# Test up migration idempotency
31+
expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error
32+
expect(operation_state_partial_index_present).to be_truthy
33+
34+
# Test down migration
35+
expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error
36+
expect(operation_state_partial_index_present).to be_falsey
37+
38+
# Test down migration idempotency
39+
expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error
40+
expect(operation_state_partial_index_present).to be_falsey
41+
42+
elsif db.database_type == :mysql
43+
# Test up migration
44+
expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index)
45+
expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error
46+
expect(db.indexes(:jobs)).to include(:jobs_operation_state_index)
47+
48+
# Test up migration idempotency
49+
expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) }.not_to raise_error
50+
expect(db.indexes(:jobs)).to include(:jobs_operation_state_index)
51+
52+
# Test down migration
53+
expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error
54+
expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index)
55+
56+
# Test down migration idempotency
57+
expect { Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) }.not_to raise_error
58+
expect(db.indexes(:jobs)).not_to include(:jobs_operation_state_index)
59+
end
60+
end
61+
end
62+
end
63+
# rubocop:enable Migration/TooManyMigrationRuns

0 commit comments

Comments
 (0)