Skip to content

Commit dd16218

Browse files
authored
Avoid thread fanout for single-item parallel maps (#1160)
1 parent 5db050e commit dd16218

5 files changed

Lines changed: 396 additions & 19 deletions

File tree

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
#!/usr/bin/env ruby
2+
# Copyright 2024 - 2026 Block, Inc.
3+
#
4+
# Use of this source code is governed by an MIT-style
5+
# license that can be found in the LICENSE file or at
6+
# https://opensource.org/licenses/MIT.
7+
#
8+
# frozen_string_literal: true
9+
10+
# Benchmarks the single-item fast path in ElasticGraph::Support::Threading.parallel_map
11+
# using local copies of the before/after implementations.
12+
#
13+
# Run with:
14+
# bundle exec ruby benchmarks/threading/parallel_map_single_item.rb
15+
16+
require "benchmark/ips"
17+
18+
module OriginalThreadingImplementation
19+
def self.parallel_map(items)
20+
threads = _ = items.map do |item|
21+
::Thread.new do
22+
::Thread.current.report_on_exception = false
23+
24+
yield item
25+
end
26+
end
27+
28+
threads.map(&:value)
29+
rescue => e
30+
e.set_backtrace(e.backtrace + caller)
31+
raise e
32+
end
33+
end
34+
35+
module UpdatedThreadingImplementation
36+
def self.parallel_map(items)
37+
return _ = items.map { |item| yield item } if items.size < 2
38+
39+
begin
40+
threads = _ = items.map do |item|
41+
::Thread.new do
42+
::Thread.current.report_on_exception = false
43+
44+
yield item
45+
end
46+
end
47+
48+
threads.map(&:value)
49+
rescue => e
50+
e.set_backtrace(e.backtrace + caller)
51+
raise e
52+
end
53+
end
54+
end
55+
56+
SINGLE_ARRAY = ["a"].freeze
57+
EMPTY_ARRAY = [].freeze
58+
MULTI_ITEM_ARRAY = %w[a b c d].freeze
59+
SINGLE_HASH = {"orders" => [1, 2, 3]}.freeze
60+
EMPTY_HASH = {}.freeze
61+
MULTI_ENTRY_HASH = {
62+
"orders" => [1, 2, 3],
63+
"payments" => [4, 5, 6],
64+
"refunds" => [7, 8, 9],
65+
"disputes" => [10, 11, 12]
66+
}.freeze
67+
68+
def updated_parallel_map(items, &block)
69+
UpdatedThreadingImplementation.parallel_map(items, &block)
70+
end
71+
72+
def assert_same_result(label)
73+
original = yield OriginalThreadingImplementation
74+
updated = yield UpdatedThreadingImplementation
75+
76+
abort "#{label} produced different results: #{original.inspect} != #{updated.inspect}" unless original == updated
77+
end
78+
79+
assert_same_result("single array") { |implementation| implementation.parallel_map(SINGLE_ARRAY, &:next) }
80+
assert_same_result("empty array") { |implementation| implementation.parallel_map(EMPTY_ARRAY, &:next) }
81+
assert_same_result("multi item array") { |implementation| implementation.parallel_map(MULTI_ITEM_ARRAY, &:next) }
82+
assert_same_result("single hash") { |implementation| implementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } }
83+
assert_same_result("empty hash") { |implementation| implementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } }
84+
assert_same_result("multi entry hash") do |implementation|
85+
implementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] }
86+
end
87+
88+
def run_ips(title)
89+
puts
90+
puts "=" * 80
91+
puts title
92+
puts "=" * 80
93+
94+
Benchmark.ips do |x|
95+
x.config(time: 5, warmup: 2)
96+
yield x
97+
x.compare!
98+
end
99+
end
100+
101+
run_ips("single item array") do |x|
102+
x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(SINGLE_ARRAY, &:next) }
103+
x.report("after: fast path") { updated_parallel_map(SINGLE_ARRAY, &:next) }
104+
end
105+
106+
run_ips("single entry hash, like one datastore-client msearch fanout") do |x|
107+
x.report("before: always spawn thread") do
108+
OriginalThreadingImplementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] }
109+
end
110+
111+
x.report("after: fast path") do
112+
updated_parallel_map(SINGLE_HASH) { |key, values| [key, values.size] }
113+
end
114+
end
115+
116+
run_ips("empty array") do |x|
117+
x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(EMPTY_ARRAY, &:next) }
118+
x.report("after: fast path") { updated_parallel_map(EMPTY_ARRAY, &:next) }
119+
end
120+
121+
run_ips("empty hash") do |x|
122+
x.report("before: always spawn thread") do
123+
OriginalThreadingImplementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] }
124+
end
125+
126+
x.report("after: fast path") do
127+
updated_parallel_map(EMPTY_HASH) { |key, values| [key, values.size] }
128+
end
129+
end
130+
131+
run_ips("multi item array, expected to stay on the threaded path") do |x|
132+
x.report("before: always spawn thread") { OriginalThreadingImplementation.parallel_map(MULTI_ITEM_ARRAY, &:next) }
133+
x.report("after: current implementation") { updated_parallel_map(MULTI_ITEM_ARRAY, &:next) }
134+
end
135+
136+
run_ips("multi entry hash, expected to stay on the threaded path") do |x|
137+
x.report("before: always spawn thread") do
138+
OriginalThreadingImplementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] }
139+
end
140+
141+
x.report("after: current implementation") do
142+
updated_parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] }
143+
end
144+
end
145+
146+
module ThreadNewCounter
147+
def self.count
148+
@count ||= 0
149+
end
150+
151+
def self.count=(count)
152+
@count = count
153+
end
154+
155+
def new(...)
156+
ThreadNewCounter.count += 1
157+
super
158+
end
159+
end
160+
161+
class << Thread
162+
prepend ThreadNewCounter
163+
end
164+
165+
def count_thread_new_calls
166+
ThreadNewCounter.count = 0
167+
yield
168+
ThreadNewCounter.count
169+
end
170+
171+
thread_count_iterations = 1_000
172+
173+
puts
174+
puts "Thread.new calls for #{thread_count_iterations} map calls"
175+
puts "-" * 80
176+
{
177+
"before single array" => -> { OriginalThreadingImplementation.parallel_map(SINGLE_ARRAY, &:next) },
178+
"after single array" => -> { updated_parallel_map(SINGLE_ARRAY, &:next) },
179+
"before single hash" => -> { OriginalThreadingImplementation.parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } },
180+
"after single hash" => -> { updated_parallel_map(SINGLE_HASH) { |key, values| [key, values.size] } },
181+
"before empty array" => -> { OriginalThreadingImplementation.parallel_map(EMPTY_ARRAY, &:next) },
182+
"after empty array" => -> { updated_parallel_map(EMPTY_ARRAY, &:next) },
183+
"before empty hash" => -> { OriginalThreadingImplementation.parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } },
184+
"after empty hash" => -> { updated_parallel_map(EMPTY_HASH) { |key, values| [key, values.size] } },
185+
"before multi item array" => -> { OriginalThreadingImplementation.parallel_map(MULTI_ITEM_ARRAY, &:next) },
186+
"after multi item array" => -> { updated_parallel_map(MULTI_ITEM_ARRAY, &:next) },
187+
"before multi entry hash" => -> { OriginalThreadingImplementation.parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } },
188+
"after multi entry hash" => -> { updated_parallel_map(MULTI_ENTRY_HASH) { |key, values| [key, values.size] } }
189+
}.each do |label, callable|
190+
count = count_thread_new_calls do
191+
thread_count_iterations.times { callable.call }
192+
end
193+
194+
puts "#{label.ljust(28)} #{count}"
195+
end
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
2+
================================================================================
3+
single item array
4+
================================================================================
5+
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
6+
Warming up --------------------------------------
7+
before: always spawn thread
8+
2.938k i/100ms
9+
after: fast path 408.510k i/100ms
10+
Calculating -------------------------------------
11+
before: always spawn thread
12+
25.825k (± 6.5%) i/s (38.72 μs/i) - 129.272k in 5.027934s
13+
after: fast path 3.993M (± 3.9%) i/s (250.43 ns/i) - 20.017M in 5.021850s
14+
15+
Comparison:
16+
after: fast path: 3993083.5 i/s
17+
before: always spawn thread: 25824.7 i/s - 154.62x slower
18+
19+
20+
================================================================================
21+
single entry hash, like one datastore-client msearch fanout
22+
================================================================================
23+
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
24+
Warming up --------------------------------------
25+
before: always spawn thread
26+
2.668k i/100ms
27+
after: fast path 318.552k i/100ms
28+
Calculating -------------------------------------
29+
before: always spawn thread
30+
21.520k (±19.5%) i/s (46.47 μs/i) - 104.052k in 5.098624s
31+
after: fast path 3.278M (± 5.4%) i/s (305.03 ns/i) - 16.565M in 5.069629s
32+
33+
Comparison:
34+
after: fast path: 3278402.5 i/s
35+
before: always spawn thread: 21519.8 i/s - 152.34x slower
36+
37+
38+
================================================================================
39+
empty array
40+
================================================================================
41+
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
42+
Warming up --------------------------------------
43+
before: always spawn thread
44+
735.659k i/100ms
45+
after: fast path 723.657k i/100ms
46+
Calculating -------------------------------------
47+
before: always spawn thread
48+
7.120M (± 8.2%) i/s (140.45 ns/i) - 35.312M in 5.009859s
49+
after: fast path 7.094M (± 4.8%) i/s (140.96 ns/i) - 35.459M in 5.011149s
50+
51+
Comparison:
52+
before: always spawn thread: 7119941.7 i/s
53+
after: fast path: 7094392.7 i/s - same-ish: difference falls within error
54+
55+
56+
================================================================================
57+
empty hash
58+
================================================================================
59+
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
60+
Warming up --------------------------------------
61+
before: always spawn thread
62+
519.319k i/100ms
63+
after: fast path 538.244k i/100ms
64+
Calculating -------------------------------------
65+
before: always spawn thread
66+
5.077M (±10.5%) i/s (196.95 ns/i) - 25.447M in 5.094679s
67+
after: fast path 5.290M (± 8.2%) i/s (189.05 ns/i) - 26.374M in 5.036762s
68+
69+
Comparison:
70+
after: fast path: 5289538.9 i/s
71+
before: always spawn thread: 5077366.9 i/s - same-ish: difference falls within error
72+
73+
74+
================================================================================
75+
multi item array, expected to stay on the threaded path
76+
================================================================================
77+
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
78+
Warming up --------------------------------------
79+
before: always spawn thread
80+
734.000 i/100ms
81+
after: current implementation
82+
704.000 i/100ms
83+
Calculating -------------------------------------
84+
before: always spawn thread
85+
7.063k (± 8.0%) i/s (141.58 μs/i) - 35.232k in 5.025075s
86+
after: current implementation
87+
6.363k (± 8.0%) i/s (157.16 μs/i) - 31.680k in 5.010779s
88+
89+
Comparison:
90+
before: always spawn thread: 7063.1 i/s
91+
after: current implementation: 6363.0 i/s - same-ish: difference falls within error
92+
93+
94+
================================================================================
95+
multi entry hash, expected to stay on the threaded path
96+
================================================================================
97+
ruby 3.4.7 (2025-10-08 revision 7a5688e2a2) +PRISM [arm64-darwin20]
98+
Warming up --------------------------------------
99+
before: always spawn thread
100+
455.000 i/100ms
101+
after: current implementation
102+
652.000 i/100ms
103+
Calculating -------------------------------------
104+
before: always spawn thread
105+
5.907k (±14.3%) i/s (169.29 μs/i) - 28.665k in 5.019039s
106+
after: current implementation
107+
6.306k (±11.1%) i/s (158.59 μs/i) - 31.296k in 5.042971s
108+
109+
Comparison:
110+
after: current implementation: 6305.7 i/s
111+
before: always spawn thread: 5906.9 i/s - same-ish: difference falls within error
112+
113+
114+
Thread.new calls for 1000 map calls
115+
--------------------------------------------------------------------------------
116+
before single array 1000
117+
after single array 0
118+
before single hash 1000
119+
after single hash 0
120+
before empty array 0
121+
after empty array 0
122+
before empty hash 0
123+
after empty hash 0
124+
before multi item array 4000
125+
after multi item array 4000
126+
before multi entry hash 4000
127+
after multi entry hash 4000

elasticgraph-support/lib/elastic_graph/support/threading.rb

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,38 @@ module ElasticGraph
1010
module Support
1111
# @private
1212
module Threading
13-
# Like Enumerable#map, but performs the map in parallel using one thread per list item.
13+
# Like Enumerable#map, but performs the map in parallel using one thread per list item
14+
# when there are multiple items.
1415
# Exceptions that happen in the threads will propagate to the caller at the end.
1516
# Due to Ruby's GVL, this will never be helpful for pure computation, but can be
1617
# quite helpful when dealing with blocking I/O. However, the cost of threads is
1718
# such that this method should not be used when you have a large list of items to
1819
# map over (say, hundreds or thousands of items or more).
1920
def self.parallel_map(items)
20-
threads = _ = items.map do |item|
21-
::Thread.new do
22-
# Disable reporting of exceptions. We use `value` at the end of this method, which
23-
# propagates any exception that happened in the thread to the calling thread. If
24-
# this is true (the default), then the exception is also printed to $stderr which
25-
# is quite noisy.
26-
::Thread.current.report_on_exception = false
21+
return _ = items.map { |item| yield item } if items.size < 2
2722

28-
yield item
23+
begin
24+
threads = _ = items.map do |item|
25+
::Thread.new do
26+
# Disable reporting of exceptions. We use `value` at the end of this method, which
27+
# propagates any exception that happened in the thread to the calling thread. If
28+
# this is true (the default), then the exception is also printed to $stderr which
29+
# is quite noisy.
30+
::Thread.current.report_on_exception = false
31+
32+
yield item
33+
end
2934
end
30-
end
3135

32-
# `value` here either returns the value of the final expression in the thread, or raises
33-
# whatever exception happened in the thread. `join` doesn't propagate the exception in
34-
# the same way, so we always want to use `Thread#value` even if we are just using threads
35-
# for side effects.
36-
threads.map(&:value)
37-
rescue => e
38-
e.set_backtrace(e.backtrace + caller)
39-
raise e
36+
# `value` here either returns the value of the final expression in the thread, or raises
37+
# whatever exception happened in the thread. `join` doesn't propagate the exception in
38+
# the same way, so we always want to use `Thread#value` even if we are just using threads
39+
# for side effects.
40+
threads.map(&:value)
41+
rescue => e
42+
e.set_backtrace(e.backtrace + caller)
43+
raise e
44+
end
4045
end
4146
end
4247
end

elasticgraph-support/sig/elastic_graph/support/threading.rbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module ElasticGraph
33
module Threading
44
def self.parallel_map:
55
[K, V, O] (::Hash[K, V]) { ([K, V]) -> O } -> ::Array[O]
6-
| [I, O] (::Enumerable[I]) { (I) -> O } -> ::Array[O]
6+
| [I, O] (::Array[I]) { (I) -> O } -> ::Array[O]
77
end
88
end
99
end

0 commit comments

Comments
 (0)