Skip to content

Commit f1bd777

Browse files
committed
Reload process to use DB-assigned updated_at timestamps
When a process is saved, Sequel's timestamps plugin sets updated_at in-memory before the DB transaction commits. If the runner is called after the transaction commits (via after_commit callback), it receives a stale in-memory timestamp that may differ from the DB-assigned value. ProcessesSync compares process.updated_at against the LRP annotation to detect drift. A stale timestamp causes unnecessary re-sync requests. This commit adds process.reload before passing the process to runners in ProcessObserver, ProcessRouteHandler, and AppUpdate. This ensures the runner receives the DB-assigned timestamp. Also removes an incorrect comment in ProcessRestart about timestamp precision differences - testing confirmed the timestamps match when the runner is called inside the transaction. Adds skip_process_observer_on_update in AppUpdate to prevent redundant ProcessObserver.updated calls when metric tags are being updated.
1 parent 36b3697 commit f1bd777

File tree

9 files changed

+111
-16
lines changed

9 files changed

+111
-16
lines changed

app/actions/app_update.rb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,20 @@ def update(app, message, lifecycle)
4444
)
4545

4646
# update process timestamp to trigger convergence if sending fails
47-
app.processes.each(&:save) if updating_metric_tags?(message)
47+
if updating_metric_tags?(message)
48+
app.processes.each do |process|
49+
process.skip_process_observer_on_update = true
50+
process.save
51+
end
52+
end
4853
end
4954

5055
if updating_metric_tags?(message)
5156
app.processes.each do |process|
52-
@runners.runner_for_process(process).update_metric_tags if process.state == ProcessModel::STARTED
57+
# Reload to get the DB-assigned updated_at, which ProcessesSync compares against
58+
# the LRP annotation to detect drift. Without this, the stale in-memory value
59+
# causes an unnecessary re-sync.
60+
@runners.runner_for_process(process.reload).update_metric_tags if process.state == ProcessModel::STARTED
5361
rescue Diego::Runner::CannotCommunicateWithDiegoError => e
5462
@logger.error("failed communicating with diego backend: #{e.message}")
5563
end

app/actions/process_restart.rb

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,6 @@ def restart(process:, config:, stop_in_runtime:, revision: nil)
99
process.lock!
1010
process.skip_process_observer_on_update = true
1111

12-
# A side-effect of submitting LRP requests before the transaction commits is that
13-
# the annotation timestamp stored on the LRP is slightly different than the process.updated_at
14-
# timestamp that is stored in the DB due to timestamp precision differences.
15-
# This difference causes the sync job to submit an extra update LRP request which updates
16-
# the LRP annotation at a later time. This should have no impact on the running LRP.
1712
if need_to_stop_in_runtime
1813
process.update(state: ProcessModel::STOPPED)
1914
runners(config).runner_for_process(process).stop

lib/cloud_controller/process_observer.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,15 @@ def react_to_state_change(process)
3737

3838
process.update(revision: process.app.latest_revision) if process.revisions_enabled?
3939

40-
@runners.runner_for_process(process).start unless process.needs_staging?
40+
# Reload to get the DB-assigned updated_at, which ProcessesSync compares against
41+
# the LRP annotation to detect drift. Without this, the stale in-memory value
42+
# causes an unnecessary re-sync.
43+
@runners.runner_for_process(process.reload).start unless process.needs_staging?
4144
end
4245

4346
def react_to_instances_change(process)
44-
@runners.runner_for_process(process).scale if process.started? && process.active?
47+
# Same as above: reload to get the DB-assigned updated_at before building the LRP annotation.
48+
@runners.runner_for_process(process.reload).scale if process.started? && process.active?
4549
end
4650

4751
def with_diego_communication_handling

lib/cloud_controller/process_route_handler.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ def update_route_information(perform_validation: true, updated_ports: false)
2828
end
2929

3030
def notify_backend_of_route_update
31-
@runners.runner_for_process(@process).update_routes if @process && @process.staged? && @process.started?
31+
# Reload to get the DB-assigned updated_at, which ProcessesSync compares against
32+
# the LRP annotation to detect drift. Without this, the stale in-memory value
33+
# causes an unnecessary re-sync.
34+
@runners.runner_for_process(@process.reload).update_routes if @process && @process.staged? && @process.started?
3235
rescue Diego::Runner::CannotCommunicateWithDiegoError => e
3336
logger.error("failed communicating with diego backend: #{e.message}")
3437
end

spec/unit/actions/app_update_spec.rb

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,25 @@ module VCAP::CloudController
103103
expect(runner).to have_received(:update_metric_tags).twice
104104
end
105105

106+
it 'does not trigger ProcessObserver when saving processes', isolation: :truncation do
107+
expect(ProcessObserver).not_to receive(:updated)
108+
app_update.update(app_model, message, lifecycle)
109+
end
110+
111+
it 'passes a process with the correct updated_at timestamp to the runner', isolation: :truncation do
112+
received = {}
113+
allow(runners).to receive(:runner_for_process) do |p|
114+
received[p.guid] = p.updated_at
115+
runner
116+
end
117+
118+
app_update.update(app_model, message, lifecycle)
119+
120+
expect(runners).to have_received(:runner_for_process).twice
121+
expect(received[web_process.guid]).to eq(web_process.reload.updated_at)
122+
expect(received[worker_process.guid]).to eq(worker_process.reload.updated_at)
123+
end
124+
106125
context 'when there is a CannotCommunicateWithDiegoError' do
107126
before do
108127
allow(runner).to receive(:update_metric_tags).and_invoke(
@@ -130,10 +149,12 @@ module VCAP::CloudController
130149
let!(:worker_process) { instance_double(VCAP::CloudController::ProcessModel) }
131150

132151
before do
152+
allow(web_process).to receive(:skip_process_observer_on_update=)
133153
allow(web_process).to receive(:save)
134-
allow(web_process).to receive(:state).and_return(ProcessModel::STARTED)
154+
allow(web_process).to receive_messages(reload: web_process, state: ProcessModel::STARTED)
155+
allow(worker_process).to receive(:skip_process_observer_on_update=)
135156
allow(worker_process).to receive(:save)
136-
allow(worker_process).to receive(:state).and_return(ProcessModel::STARTED)
157+
allow(worker_process).to receive_messages(reload: worker_process, state: ProcessModel::STARTED)
137158
allow(app_model).to receive(:processes).and_return([web_process, worker_process])
138159
end
139160

@@ -180,10 +201,12 @@ module VCAP::CloudController
180201
let!(:worker_process) { instance_double(VCAP::CloudController::ProcessModel) }
181202

182203
before do
204+
allow(web_process).to receive(:skip_process_observer_on_update=)
183205
allow(web_process).to receive(:save)
184-
allow(web_process).to receive(:state).and_return(ProcessModel::STARTED)
206+
allow(web_process).to receive_messages(reload: web_process, state: ProcessModel::STARTED)
207+
allow(worker_process).to receive(:skip_process_observer_on_update=)
185208
allow(worker_process).to receive(:save)
186-
allow(worker_process).to receive(:state).and_return(ProcessModel::STARTED)
209+
allow(worker_process).to receive_messages(reload: worker_process, state: ProcessModel::STARTED)
187210
allow(app_model).to receive(:processes).and_return([web_process, worker_process])
188211
end
189212

spec/unit/actions/process_restart_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,19 @@ module VCAP::CloudController
3636
ProcessRestart.restart(process: process, config: config, stop_in_runtime: true)
3737
end
3838

39+
it 'passes a process with the correct updated_at timestamp to the runner', isolation: :truncation do
40+
received = nil
41+
allow(VCAP::CloudController::Diego::Runner).to receive(:new) do |p, _config|
42+
received = p.updated_at
43+
runner
44+
end
45+
46+
ProcessRestart.restart(process: process, config: config, stop_in_runtime: false)
47+
48+
expect(VCAP::CloudController::Diego::Runner).to have_received(:new).once
49+
expect(received).to eq(process.reload.updated_at)
50+
end
51+
3952
context 'when the process is STARTED' do
4053
it 'keeps process state as STARTED' do
4154
ProcessRestart.restart(process: process, config: config, stop_in_runtime: true)

spec/unit/lib/cloud_controller/process_observer_spec.rb

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ module VCAP::CloudController
55
let(:stagers) { double(:stagers, stager_for_build: stager) }
66
let(:runners) { instance_double(Runners, runner_for_process: runner) }
77
let(:stager) { double(:stager) }
8-
let(:runner) { instance_double(Diego::Runner, stop: nil, start: nil) }
8+
let(:runner) { instance_double(Diego::Runner, stop: nil, start: nil, scale: nil) }
99
let(:process_active) { true }
1010
let(:diego) { false }
1111
let(:process) do
@@ -33,6 +33,7 @@ module VCAP::CloudController
3333

3434
before do
3535
ProcessObserver.configure(stagers, runners)
36+
allow(process).to receive(:reload).and_return(process)
3637
end
3738

3839
describe '.deleted' do
@@ -312,6 +313,37 @@ module VCAP::CloudController
312313
end
313314
end
314315
end
316+
317+
context 'updated_at annotation accuracy' do
318+
let(:received) { [] }
319+
320+
before do
321+
allow(runners).to receive(:runner_for_process) do |p|
322+
received << p.updated_at
323+
runner
324+
end
325+
end
326+
327+
context 'when the process instances have changed' do
328+
it 'passes a process with the correct updated_at timestamp to the runner', isolation: :truncation do
329+
process_model = ProcessModelFactory.make(state: 'STARTED')
330+
process_model.update(instances: process_model.instances + 1)
331+
332+
expect(runners).to have_received(:runner_for_process).once
333+
expect(received.last).to eq(process_model.reload.updated_at)
334+
end
335+
end
336+
337+
context 'when the process state has changed' do
338+
it 'passes a process with the correct updated_at timestamp to the runner', isolation: :truncation do
339+
process_model = ProcessModelFactory.make(state: 'STOPPED')
340+
process_model.update(state: ProcessModel::STARTED)
341+
342+
expect(runners).to have_received(:runner_for_process).once
343+
expect(received.last).to eq(process_model.reload.updated_at)
344+
end
345+
end
346+
end
315347
end
316348
end
317349
end

spec/unit/lib/cloud_controller/process_route_handler_spec.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,5 +196,22 @@ module VCAP::CloudController
196196
end
197197
end
198198
end
199+
200+
describe 'updated_at annotation accuracy' do
201+
let(:process) { ProcessModelFactory.make(state: 'STARTED') }
202+
203+
it 'passes a process with the correct updated_at timestamp to the runner', isolation: :truncation do
204+
received = nil
205+
allow(runners).to receive(:runner_for_process) do |p|
206+
received = p.updated_at
207+
runner
208+
end
209+
210+
handler.update_route_information
211+
212+
expect(runners).to have_received(:runner_for_process).once
213+
expect(received).to eq(process.reload.updated_at)
214+
end
215+
end
199216
end
200217
end

spec/unit/models/runtime/process_model_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1495,7 +1495,7 @@ def act_as_cf_admin
14951495
end
14961496

14971497
describe 'saving' do
1498-
it 'calls AppObserver.updated', isolation: :truncation do
1498+
it 'calls ProcessObserver.updated', isolation: :truncation do
14991499
process = ProcessModelFactory.make
15001500
expect(ProcessObserver).to receive(:updated).with(process)
15011501
process.update(instances: process.instances + 1)

0 commit comments

Comments
 (0)