Skip to content
This repository was archived by the owner on Oct 16, 2025. It is now read-only.

Commit 372b9a0

Browse files
committed
simplify nodes
1 parent 9cb255a commit 372b9a0

1 file changed

Lines changed: 3 additions & 194 deletions

File tree

src-colladmin/actions/zookeeper_action.rb

Lines changed: 3 additions & 194 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def perform_action
8484
# Collection Admin Task class - see config/actions.yml for description
8585
class ZookeeperDumpAction < ZookeeperAction
8686
def initialize(config, action, path, myparams)
87+
@node_dump = MerrittZK::NodeDump.new(@zk, myparams)
8788
@zkpath = myparams.fetch('zkpath', '/')
8889
@mode = myparams.fetch('mode', 'data')
8990
@full = false
@@ -93,193 +94,7 @@ def initialize(config, action, path, myparams)
9394
end
9495

9596
def data
96-
@buf = StringIO.new
97-
@buf << "Node State as of #{Time.now}:\n"
98-
dump_node(@zkpath)
99-
@buf.rewind
100-
@buf.read
101-
end
102-
103-
def standard_node(n)
104-
n =~ %r{^/(access|batch-uuids|batches|jobs|locks|migration)(/|$)}
105-
end
106-
107-
def system_node(n)
108-
n =~ %r{^/zookeeper(/|$)}
109-
end
110-
111-
def show_data(n)
112-
d = get_data(n)
113-
df = d.is_a?(Hash) ? "\n#{JSON.pretty_generate(d)}" : " #{d}"
114-
df = df.encode('UTF-8', invalid: :replace, undef: :replace, replace: '?')
115-
@buf << df unless @buf.nil?
116-
rescue StandardError => e
117-
@buf << e
118-
end
119-
120-
def get_data(n)
121-
d = @zk.get(n)[0]
122-
return '' if d.nil?
123-
124-
begin
125-
JSON.parse(d.encode('UTF-8', invalid: :replace, undef: :replace, replace: '?'), symbolize_names: true)
126-
rescue JSON::ParserError
127-
d
128-
rescue StandardError => e
129-
"\n #{e.class}:#{e}:\n #{d}"
130-
end
131-
end
132-
133-
def node_datetime(n)
134-
return 'na' unless @zk.exists?(n)
135-
136-
ctime = @zk.stat(n).ctime
137-
ctime.nil? ? 'na' : Time.at(ctime / 1000).strftime('%Y-%m-%d %H:%M:%S')
138-
end
139-
140-
def node_stat(n)
141-
return 'FAIL' unless @zk.exists?(n)
142-
143-
ctime = @zk.stat(n).ctime
144-
return 'FAIL' if ctime.nil?
145-
146-
Time.now - Time.at(ctime / 1000) > 3600 ? 'FAIL' : 'WARN'
147-
end
148-
149-
def test_node(path, deleteable, n)
150-
return if @zk.exists?(n)
151-
152-
result = { path: path, test: "Test: #{n} should exist", status: node_stat(path) }
153-
@test_results.append([
154-
result[:path], node_datetime(path), deleteable ? result[:path] : '', result[:test],
155-
result[:status]
156-
])
157-
@buf << "\n #{result[:test]}: #{result[:status]}" unless @buf.nil?
158-
end
159-
160-
def test_has_children(path, deleteable, n)
161-
return if @zk.exists?(n) && !@zk.children(n).empty?
162-
163-
result = { path: path, test: "Test: #{n} should have children", status: node_stat(path) }
164-
@test_results.append([
165-
result[:path], node_datetime(path), deleteable ? result[:path] : '', result[:test],
166-
result[:status]
167-
])
168-
@buf << "\n #{result[:test]}: #{result[:status]}" unless @buf.nil?
169-
end
170-
171-
def test_not_node(path, deleteable, n)
172-
return unless @zk.exists?(n)
173-
174-
result = { path: path, test: "Test: #{n} should NOT exist", status: node_stat(path) }
175-
@test_results.append([
176-
result[:path], node_datetime(path), deleteable ? result[:path] : '', result[:test],
177-
result[:status]
178-
])
179-
@buf << "\n #{result[:test]}: #{result[:status]}" unless @buf.nil?
180-
end
181-
182-
def show_test(n)
183-
rx1 = %r{^/batches/bid[0-9]+/states/batch-.*/(jid[0-9]+)$}
184-
rx2 = %r{^/jobs/(jid[0-9]+)/bid$}
185-
rx3 = %r{^/jobs/(jid[0-9]+)$}
186-
rx4 = %r{^/jobs/states/[^/]*/[0-9][0-9]-(jid[0-9]+)$}
187-
rx5 = %r{^/batches/bid[0-9]+/states$}
188-
189-
case n
190-
when %r{^/batch-uuids/(.*)}
191-
d = get_data(n)
192-
test_node(n, true, "/batches/#{d}")
193-
when %r{^/batches/bid[0-9]+/submission}
194-
d = get_data(n).fetch(:batchID, 'na')
195-
test_node(n, false, "/batch-uuids/#{d}")
196-
when rx1
197-
jid = rx1.match(n)[1]
198-
test_node(n, true, "/jobs/#{jid}")
199-
when rx2
200-
jid = rx2.match(n)[1]
201-
bid = get_data(n)
202-
test_node(n, false, "/batches/#{bid}")
203-
snode = "/jobs/#{jid}/status"
204-
test_node(n, true, snode)
205-
if @zk.exists?(snode)
206-
d = get_data(snode)
207-
return if d.nil?
208-
209-
status = d.fetch(:status, 'na').downcase
210-
case status
211-
when 'deleted'
212-
bstatus = 'batch-deleted'
213-
when 'completed'
214-
bstatus = 'batch-completed'
215-
when 'failed'
216-
bstatus = 'batch-failed'
217-
else
218-
bstatus = 'batch-processing'
219-
end
220-
test_node(n, false, "/batches/#{bid}/states/#{bstatus}/#{jid}")
221-
%w[batch-deleted batch-completed batch-failed batch-processing].each do |ts|
222-
next if ts == bstatus
223-
224-
test_not_node(n, false, "/batches/#{bid}/states/#{ts}/#{jid}")
225-
end
226-
end
227-
when rx3
228-
jid = rx3.match(n)[1]
229-
snode = "/jobs/#{jid}/status"
230-
test_node(n, true, snode)
231-
if @zk.exists?(snode)
232-
d = get_data(snode)
233-
status = d.fetch(:status, 'na').downcase
234-
priority = get_data("#{n}/priority")
235-
test_node(n, false, "/jobs/states/#{status}/#{format('%02d', priority)}-#{jid}")
236-
end
237-
when rx4
238-
jid = rx4.match(n)[1]
239-
test_node(n, true, "/jobs/#{jid}")
240-
@job_states_count[jid] = [] unless @job_states_count.key?(jid)
241-
@job_states_count[jid].append(n)
242-
when rx5
243-
test_has_children(n, false, n)
244-
end
245-
end
246-
247-
def report_node(n)
248-
@buf << "#{n}:" unless @buf.nil?
249-
if standard_node(n)
250-
show_data(n) if @mode == 'data'
251-
show_test(n) if @mode == 'test'
252-
else
253-
@buf << " Unsupported\n" unless @buf.nil?
254-
end
255-
@buf << "\n" unless @buf.nil?
256-
end
257-
258-
def check_full
259-
return false if @buf.nil?
260-
return true if @full
261-
262-
# Lambda payload limit. May need to save output to S3.
263-
if @buf.size > 250_000
264-
@buf << '... (truncated at 256K)'
265-
@full = true
266-
end
267-
@full
268-
end
269-
270-
def dump_node(n = '/')
271-
return if check_full
272-
return unless @zk.exists?(n)
273-
return if system_node(n)
274-
275-
report_node(n)
276-
arr = @zk.children(n)
277-
return if arr.empty?
278-
279-
arr.sort.each do |cp|
280-
p = "#{n}/#{cp}".gsub(%r{/+}, '/')
281-
dump_node(p)
282-
end
97+
@node_dump.listing
28398
end
28499
end
285100

@@ -299,13 +114,7 @@ def table_types
299114
end
300115

301116
def table_rows(_body)
302-
dump_node(@zkpath)
303-
@job_states_count.each_value do |states|
304-
next unless states.length > 1
305-
306-
@test_results.append([states.to_s, '', '', 'Duplicate JID', 'FAIL'])
307-
end
308-
@test_results
117+
[]
309118
end
310119

311120
def perform_action

0 commit comments

Comments
 (0)