-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathparallelize.rb
More file actions
74 lines (66 loc) · 2.88 KB
/
parallelize.rb
File metadata and controls
74 lines (66 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# frozen_string_literal: true
# Map a code block onto an array, where each array element executes in parallel.
# This function is experimental.
#
# > **Note:** Not available in apply block.
Puppet::Functions.create_function(:parallelize, Puppet::Functions::InternalFunction) do
# Map a block onto an array, where each array element executes in parallel.
# This function is experimental.
# @param data The array to apply the block to.
# @param block The code block to execute for each array element.
# @return [Array] An array of PlanResult objects. Each input from the input
# array returns a corresponding PlanResult object.
# @example Execute two tasks on two targets.
# $targets = get_targets(["host1", "host2"])
# $result = parallelize ($targets) |$t| {
# run_task('a', $t)
# run_task('b', $t)
# }
dispatch :parallelize do
scope_param
param 'Array[Any]', :data
block_param 'Callable[Any]', :block
return_type 'Array[Boltlib::PlanResult]'
end
def parallelize(scope, data, &block)
unless Puppet[:tasks]
raise Puppet::ParseErrorWithIssue
.from_issue_and_stack(Bolt::PAL::Issues::PLAN_OPERATION_NOT_SUPPORTED_WHEN_COMPILING, action: 'parallelize')
end
executor = Puppet.lookup(:bolt_executor)
executor.report_function_call(self.class.name)
futures = data.map do |object|
# We're going to immediately wait for these futures, *and* don't want
# their results to be returned as part of `wait()`, so use a 'dummy'
# value as the plan_id. This could also be nil, though in general we want
# to require Futures to have a plan stack so that they don't get lost.
executor.create_future(scope: scope, plan_id: 'parallel') do |newscope|
# Catch 'return' calls inside the block
result = catch(:return) do
# Add the object to the block parameters
args = { block.parameters[0][1].to_s => object }
# Execute the block. Individual plan steps in the block will yield
# the Fiber if they haven't finished, so all this needs to do is run
# the block.
block.closure.call_by_name_with_scope(newscope, args, true)
end
# If we got a return from the block, get its value
# Otherwise the result is the last line from the block
result = result.value if result.is_a?(Puppet::Pops::Evaluator::Return)
# Validate the result is a PlanResult
unless Puppet::Pops::Types::TypeParser.singleton.parse('Boltlib::PlanResult').instance?(result)
raise Bolt::InvalidParallelResult.new(result.to_s, *Puppet::Pops::PuppetStack.top_of_stack)
end
result
rescue Puppet::PreformattedError => e
if e.cause.is_a?(Bolt::Error)
e.cause
else
raise e
end
end
end
# We may eventually want parallelize to accept a timeout
executor.wait(futures)
end
end