-
Notifications
You must be signed in to change notification settings - Fork 368
Expand file tree
/
Copy pathtasks_sync.rb
More file actions
152 lines (125 loc) · 4.65 KB
/
tasks_sync.rb
File metadata and controls
152 lines (125 loc) · 4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
module VCAP::CloudController
module Diego
class TasksSync
BATCH_SIZE = 500
PENDING_TASK_EXPIRATION_IN_SECONDS = 300
class Error < StandardError
end
class BBSFetchError < Error
end
def initialize(config:)
@config = config
@workpool = WorkPool.new(50, store_exceptions: true)
end
def sync
logger.info('run-task-sync')
@bump_freshness = true
diego_tasks = bbs_task_client.fetch_tasks.index_by(&:task_guid)
to_update = []
to_cancel = []
expired_pending = []
batched_cc_tasks do |cc_tasks|
cc_tasks.each do |cc_task|
diego_task = diego_tasks.delete(cc_task.guid)
if cc_task.state == TaskModel::PENDING_STATE
expired_pending << cc_task.guid if cc_task.created_at < Time.now.utc - PENDING_TASK_EXPIRATION_IN_SECONDS
next
end
next unless [TaskModel::RUNNING_STATE, TaskModel::CANCELING_STATE].include? cc_task.state
if diego_task.nil?
to_update << cc_task.guid
elsif cc_task.state == TaskModel::CANCELING_STATE
to_cancel << cc_task.guid
end
end
end
fail_expired_pending_tasks(expired_pending)
update_missing_diego_tasks(to_update)
cancel_cc_tasks(to_cancel)
cancel_missing_cc_tasks(diego_tasks)
workpool.drain
process_workpool_exceptions(workpool.exceptions)
rescue CloudController::Errors::ApiError => e
logger.info('sync-failed', error: e.name, error_message: e.message)
@bump_freshness = false
raise BBSFetchError.new(e.message)
rescue StandardError => e
logger.info('sync-failed', error: e.class.name, error_message: e.message)
@bump_freshness = false
raise
ensure
workpool.drain
if @bump_freshness
bbs_task_client.bump_freshness
logger.info('finished-task-sync')
else
logger.info('sync-failed')
end
end
private
attr_reader :workpool
def process_workpool_exceptions(exceptions)
exceptions.each do |e|
logger.error('error-cancelling-task', error: e.class.name, error_message: e.message, error_backtrace: formatted_backtrace_from_error(e))
@bump_freshness = false
end
end
def formatted_backtrace_from_error(error)
error.backtrace.present? ? error.backtrace.join("\n") + "\n..." : ''
end
def update_missing_diego_tasks(to_update)
to_update.each do |task_guid|
workpool.submit(task_guid) do |guid|
if bbs_task_client.fetch_task(guid).nil?
# Mark the CC task as failed. Don't update tasks that are already in a terminal state.
task = TaskModel.where(guid:).exclude(state: [TaskModel::FAILED_STATE, TaskModel::SUCCEEDED_STATE]).first
task&.update(state: TaskModel::FAILED_STATE, failure_reason: BULKER_TASK_FAILURE) # invoke model's update method to create an event
logger.info('missing-diego-task', task_guid: guid)
end
end
end
end
def cancel_cc_tasks(to_cancel)
to_cancel.each do |task_guid|
workpool.submit(task_guid) do |guid|
bbs_task_client.cancel_task(guid)
logger.info('canceled-cc-task', task_guid: guid)
end
end
end
def fail_expired_pending_tasks(expired_pending)
expired_pending.each do |task_guid|
task = TaskModel.where(guid: task_guid, state: TaskModel::PENDING_STATE).first
next unless task
task.update(state: TaskModel::FAILED_STATE, failure_reason: 'Task expired in PENDING state')
logger.info('expired-pending-task', task_guid: task_guid)
end
end
def cancel_missing_cc_tasks(to_cancel_missing)
to_cancel_missing.each_key do |task_guid|
workpool.submit(task_guid) do |guid|
bbs_task_client.cancel_task(guid)
logger.info('missing-cc-task', task_guid: guid)
end
end
end
def batched_cc_tasks
last_id = 0
loop do
tasks = TaskModel.where(
Sequel.lit('tasks.id > ?', last_id)
).order(:id).limit(BATCH_SIZE).select(:id, :guid, :state, :created_at).all
yield tasks
return if tasks.count < BATCH_SIZE
last_id = tasks.last.id
end
end
def bbs_task_client
CloudController::DependencyLocator.instance.bbs_task_client
end
def logger
@logger ||= Steno.logger('cc.diego.sync.tasks')
end
end
end
end