Skip to content

Commit aa211e6

Browse files
authored
Fix broker stuck in SYNCHRONIZING on DB error during rollback (#4995)
Service brokers can become permanently stuck in SYNCHRONIZING state when a database connection failure occurs while a failed job attempts to revert the broker state. Without intervention, the broker remains unusable even after the database recovers. This change implements a multi-layered error handling approach: 1. Immediate rollback: Best-effort state reversion in the job's rescue block with graceful error handling that doesn't mask the original failure 2. Failure recovery hook: New recover_from_failure method invoked when jobs transition to FAILED state after retries are exhausted. This serves as a safety net to set the broker to SYNCHRONIZATION_FAILED when the database becomes available again 3. Conditional updates: WHERE clauses ensure only SYNCHRONIZING brokers are affected, protecting against overwriting newer states The failure hook infrastructure is implemented in PollableJobWrapper and WrappingJob, allowing any job to implement recover_from_failure for cleanup when transitioning to permanent failure. Changes: - Add PollableJobWrapper.failure hook that calls recover_from_failure - Add WrappingJob.recover_from_failure delegation with respond_to? check - Implement recover_from_failure in UpdateBrokerJob and SynchronizeBrokerCatalogJob to set brokers to SYNCHRONIZATION_FAILED - Add graceful error handling to rollback_broker_state - Add comprehensive test coverage for all new behavior
1 parent aa48775 commit aa211e6

File tree

8 files changed

+260
-3
lines changed

8 files changed

+260
-3
lines changed

app/jobs/pollable_job_wrapper.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ def error(job, exception)
7070
end
7171

7272
def failure(job)
73+
begin
74+
recover_from_failure
75+
rescue StandardError => e
76+
logger.error("failure recovery failed: #{e.class}: #{e.message}")
77+
end
78+
7379
change_state(job, PollableJobModel::FAILED_STATE)
7480
end
7581

app/jobs/v3/services/synchronize_broker_catalog_job.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ def display_name
3434
'service_broker.catalog.synchronize'
3535
end
3636

37+
def recover_from_failure
38+
ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING).
39+
update(state: ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED)
40+
rescue StandardError => e
41+
logger = Steno.logger('cc.background')
42+
logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}")
43+
end
44+
3745
private
3846

3947
attr_reader :broker_guid, :user_audit_info

app/jobs/v3/services/update_broker_job.rb

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ def display_name
3737
'service_broker.update'
3838
end
3939

40+
def recover_from_failure
41+
ServiceBroker.where(guid: broker_guid, state: ServiceBrokerStateEnum::SYNCHRONIZING).
42+
update(state: ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED)
43+
rescue StandardError => e
44+
logger = Steno.logger('cc.background')
45+
logger.error("Failed to recover broker state for #{broker_guid}: #{e.class}: #{e.message}")
46+
end
47+
4048
private
4149

4250
attr_reader :update_request_guid, :broker_guid, :previous_broker_state, :user_audit_info
@@ -66,17 +74,32 @@ def perform
6674

6775
@warnings
6876
rescue StandardError => e
69-
ServiceBroker.where(id: update_request.service_broker_id).update(state: previous_broker_state)
77+
rollback_broker_state
7078

7179
raise V3::ServiceBrokerUpdate::InvalidServiceBroker.new(e.message) if e.is_a?(Sequel::ValidationFailed)
7280

73-
raise e
81+
raise
7482
ensure
75-
update_request.destroy
83+
destroy_update_request
7684
end
7785

7886
private
7987

88+
def rollback_broker_state
89+
return unless update_request
90+
91+
ServiceBroker.where(id: update_request.service_broker_id, state: ServiceBrokerStateEnum::SYNCHRONIZING).
92+
update(state: previous_broker_state)
93+
rescue StandardError
94+
# Best effort only; wrapper failure hook will retry
95+
end
96+
97+
def destroy_update_request
98+
update_request&.destroy
99+
rescue StandardError
100+
# Don't mask original failure
101+
end
102+
80103
def update_params
81104
params = {}
82105
params[:name] = update_request.name unless update_request.name.nil?

app/jobs/wrapping_job.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ def error(job, e)
5151
handler.error(job, e) if handler.respond_to?(:error)
5252
end
5353

54+
def recover_from_failure
55+
handler.recover_from_failure if handler.respond_to?(:recover_from_failure)
56+
end
57+
5458
def display_name
5559
handler.respond_to?(:display_name) ? handler.display_name : handler.class.name
5660
end

spec/unit/jobs/pollable_job_wrapper_spec.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,57 @@ class BigException < StandardError
201201
execute_all_jobs(expected_successes: 0, expected_failures: 1)
202202
end
203203
end
204+
205+
describe '#failure' do
206+
let(:delayed_job) { instance_double(Delayed::Backend::Sequel::Job, guid: 'job-guid') }
207+
let!(:pollable_job) do
208+
VCAP::CloudController::PollableJobModel.make(delayed_job_guid: 'job-guid', state: VCAP::CloudController::PollableJobModel::PROCESSING_STATE)
209+
end
210+
211+
context 'when handler implements recover_from_failure' do
212+
let(:handler) do
213+
instance_double(VCAP::CloudController::V3::UpdateBrokerJob, recover_from_failure: nil, warnings: nil)
214+
end
215+
let(:wrapper) { PollableJobWrapper.new(handler) }
216+
217+
it 'calls recover_from_failure and marks the pollable job failed' do
218+
wrapper.failure(delayed_job)
219+
220+
expect(handler).to have_received(:recover_from_failure)
221+
expect(pollable_job.reload.state).to eq(VCAP::CloudController::PollableJobModel::FAILED_STATE)
222+
end
223+
224+
context 'when recover_from_failure raises an error' do
225+
let(:logger) { instance_double(Steno::Logger, error: nil) }
226+
227+
before do
228+
allow(handler).to receive(:recover_from_failure).and_raise(StandardError.new('recovery failed'))
229+
allow(Steno).to receive(:logger).with('cc.pollable.job.wrapper').and_return(logger)
230+
end
231+
232+
it 'logs the error without re-raising' do
233+
expect { wrapper.failure(delayed_job) }.not_to raise_error
234+
expect(logger).to have_received(:error).with(/failure recovery failed/)
235+
end
236+
237+
it 'still marks the pollable job as failed' do
238+
wrapper.failure(delayed_job)
239+
240+
expect(pollable_job.reload.state).to eq(VCAP::CloudController::PollableJobModel::FAILED_STATE)
241+
end
242+
end
243+
end
244+
245+
context 'when handler does not implement recover_from_failure' do
246+
let(:handler) { double('HandlerWithoutRecovery', warnings: nil) }
247+
let(:wrapper) { PollableJobWrapper.new(handler) }
248+
249+
it 'still marks the pollable job as failed without error' do
250+
expect { wrapper.failure(delayed_job) }.not_to raise_error
251+
expect(pollable_job.reload.state).to eq(VCAP::CloudController::PollableJobModel::FAILED_STATE)
252+
end
253+
end
254+
end
204255
end
205256
end
206257

spec/unit/jobs/v3/services/synchronize_broker_catalog_job_spec.rb

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,70 @@ def incompatible_catalog
201201
allow(catalog).to receive_messages(valid?: true, compatible?: false, incompatibility_errors: incompatibility_errors)
202202
end
203203
end
204+
205+
describe '#recover_from_failure' do
206+
let(:broker) do
207+
ServiceBroker.create(
208+
name: 'test-broker',
209+
broker_url: 'http://example.org/broker-url',
210+
auth_username: 'username',
211+
auth_password: 'password'
212+
)
213+
end
214+
let(:user_audit_info) { instance_double(UserAuditInfo, { user_guid: Sham.guid }) }
215+
216+
subject(:job) do
217+
SynchronizeBrokerCatalogJob.new(broker.guid, user_audit_info:)
218+
end
219+
220+
context 'when broker is in SYNCHRONIZING state' do
221+
before do
222+
broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING)
223+
end
224+
225+
it 'sets the broker to SYNCHRONIZATION_FAILED' do
226+
expect do
227+
job.recover_from_failure
228+
end.to change { broker.reload.state }.
229+
from(ServiceBrokerStateEnum::SYNCHRONIZING).
230+
to(ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED)
231+
end
232+
end
233+
234+
shared_examples 'does not change the broker state' do |expected_state|
235+
it 'leaves the state unchanged' do
236+
broker.update(state: expected_state)
237+
job.recover_from_failure
238+
239+
expect(broker.reload.state).to eq(expected_state)
240+
end
241+
end
242+
243+
context 'when broker is in a different state' do
244+
include_examples 'does not change the broker state', ServiceBrokerStateEnum::AVAILABLE
245+
include_examples 'does not change the broker state', ServiceBrokerStateEnum::DELETE_IN_PROGRESS
246+
end
247+
248+
context 'when database error occurs during recovery' do
249+
let(:dataset) { instance_double(Sequel::Dataset) }
250+
let(:logger) { instance_double(Steno::Logger, error: nil) }
251+
252+
before do
253+
broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING)
254+
allow(ServiceBroker).to receive(:where).
255+
with(guid: broker.guid, state: ServiceBrokerStateEnum::SYNCHRONIZING).
256+
and_return(dataset)
257+
allow(dataset).to receive(:update).and_raise(Sequel::DatabaseError.new(RuntimeError.new('connection lost')))
258+
allow(Steno).to receive(:logger).with('cc.background').and_return(logger)
259+
end
260+
261+
it 'logs the error and does not raise' do
262+
expect { job.recover_from_failure }.not_to raise_error
263+
expect(logger).to have_received(:error).with(/Failed to recover broker state/)
264+
expect(broker.reload.state).to eq(ServiceBrokerStateEnum::SYNCHRONIZING)
265+
end
266+
end
267+
end
204268
end
205269
end
206270
end

spec/unit/jobs/v3/services/update_broker_job_spec.rb

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,29 @@ module V3
446446
end
447447
end
448448

449+
context 'when database disconnects during state rollback' do
450+
let(:catalog_error) { StandardError.new('Catalog fetch failed') }
451+
let(:mock_dataset) { instance_double(Sequel::Postgres::Dataset) }
452+
453+
before do
454+
allow_any_instance_of(VCAP::CloudController::V3::ServiceBrokerCatalogUpdater).to receive(:refresh).and_raise(catalog_error)
455+
allow(mock_dataset).to receive(:update).and_raise(Sequel::DatabaseDisconnectError.new('connection lost'))
456+
allow(ServiceBroker).to receive(:where).and_call_original
457+
allow(ServiceBroker).to receive(:where).
458+
with(id: update_broker_request.service_broker_id, state: ServiceBrokerStateEnum::SYNCHRONIZING).
459+
and_return(mock_dataset)
460+
end
461+
462+
it 're-raises the original error instead of the rollback database error' do
463+
expect { job.perform }.to raise_error(catalog_error)
464+
end
465+
466+
it 'still cleans up the update request' do
467+
expect { job.perform }.to raise_error(catalog_error)
468+
expect(ServiceBrokerUpdateRequest.where(id: update_broker_request.id).all).to be_empty
469+
end
470+
end
471+
449472
context 'when the broker ceases to exist during the job' do
450473
it 'raises a ServiceBrokerGone error' do
451474
broker.destroy
@@ -457,6 +480,62 @@ module V3
457480
end
458481
end
459482

483+
describe '#recover_from_failure' do
484+
let(:previous_state) { ServiceBrokerStateEnum::AVAILABLE }
485+
486+
subject(:job) do
487+
UpdateBrokerJob.new(update_broker_request.guid, broker.guid, previous_state, user_audit_info:)
488+
end
489+
490+
context 'when broker is in SYNCHRONIZING state' do
491+
before do
492+
broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING)
493+
end
494+
495+
it 'sets the broker to SYNCHRONIZATION_FAILED' do
496+
expect do
497+
job.recover_from_failure
498+
end.to change { broker.reload.state }.
499+
from(ServiceBrokerStateEnum::SYNCHRONIZING).
500+
to(ServiceBrokerStateEnum::SYNCHRONIZATION_FAILED)
501+
end
502+
end
503+
504+
shared_examples 'does not change the broker state' do |expected_state|
505+
it 'leaves the state unchanged' do
506+
broker.update(state: expected_state)
507+
job.recover_from_failure
508+
509+
expect(broker.reload.state).to eq(expected_state)
510+
end
511+
end
512+
513+
context 'when broker is in a different state' do
514+
include_examples 'does not change the broker state', ServiceBrokerStateEnum::AVAILABLE
515+
include_examples 'does not change the broker state', ServiceBrokerStateEnum::DELETE_IN_PROGRESS
516+
end
517+
518+
context 'when database error occurs during recovery' do
519+
let(:dataset) { instance_double(Sequel::Dataset) }
520+
let(:logger) { instance_double(Steno::Logger, error: nil) }
521+
522+
before do
523+
broker.update(state: ServiceBrokerStateEnum::SYNCHRONIZING)
524+
allow(ServiceBroker).to receive(:where).
525+
with(guid: broker.guid, state: ServiceBrokerStateEnum::SYNCHRONIZING).
526+
and_return(dataset)
527+
allow(dataset).to receive(:update).and_raise(Sequel::DatabaseError.new(RuntimeError.new('connection lost')))
528+
allow(Steno).to receive(:logger).with('cc.background').and_return(logger)
529+
end
530+
531+
it 'logs the error and does not raise' do
532+
expect { job.recover_from_failure }.not_to raise_error
533+
expect(logger).to have_received(:error).with(/Failed to recover broker state/)
534+
expect(broker.reload.state).to eq(ServiceBrokerStateEnum::SYNCHRONIZING)
535+
end
536+
end
537+
end
538+
460539
def setup_broker_with_invalid_catalog
461540
catalog = instance_double(Services::ServiceBrokers::V2::Catalog)
462541

spec/unit/jobs/wrapping_job_spec.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,28 @@ module Jobs
198198
end
199199
end
200200
end
201+
202+
describe '#recover_from_failure' do
203+
context 'when the wrapped job has the recover_from_failure method defined' do
204+
it 'delegates to the handler' do
205+
handler = double('Job', recover_from_failure: nil)
206+
job = WrappingJob.new(handler)
207+
208+
expect(handler).to receive(:recover_from_failure)
209+
job.recover_from_failure
210+
end
211+
end
212+
213+
context 'when the wrapped job does not have the recover_from_failure method defined' do
214+
it 'does not raise an exception' do
215+
handler = Object.new
216+
job = WrappingJob.new(handler)
217+
expect do
218+
job.recover_from_failure
219+
end.not_to raise_error
220+
end
221+
end
222+
end
201223
end
202224
end
203225
end

0 commit comments

Comments
 (0)