Skip to content

Commit 311de62

Browse files
committed
Fix ReactorBuffer registration in subinterpreters and add benchmark
1 parent 5ae0b48 commit 311de62

2 files changed

Lines changed: 294 additions & 0 deletions

File tree

c_src/py_subinterp_pool.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
*/
2626

2727
#include "py_subinterp_pool.h"
28+
#include "py_reactor_buffer.h"
2829
#include <string.h>
2930

3031
#ifdef HAVE_SUBINTERPRETERS
@@ -163,6 +164,12 @@ int subinterp_pool_init(int size) {
163164
PyErr_Clear();
164165
/* Non-fatal - continue without erlang module */
165166
} else {
167+
/* Register ReactorBuffer with erlang module in this subinterpreter */
168+
if (ReactorBuffer_register_with_reactor() < 0) {
169+
PyErr_Clear();
170+
/* Non-fatal - ReactorBuffer just won't be available */
171+
}
172+
166173
/* Import erlang module into globals */
167174
PyObject *erlang_module = PyImport_ImportModule("erlang");
168175
if (erlang_module != NULL) {

examples/bench_reactor_buffer.erl

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
#!/usr/bin/env escript
2+
%% -*- erlang -*-
3+
%%! -pa _build/default/lib/erlang_python/ebin
4+
5+
%%% @doc Benchmark script for ReactorBuffer zero-copy performance.
6+
%%%
7+
%%% Run with:
8+
%%% rebar3 compile && escript examples/bench_reactor_buffer.erl
9+
10+
-mode(compile).
11+
12+
main(_Args) ->
13+
io:format("~n========================================~n"),
14+
io:format("ReactorBuffer Zero-Copy Benchmark~n"),
15+
io:format("========================================~n~n"),
16+
17+
%% Start the application
18+
{ok, _} = application:ensure_all_started(erlang_python),
19+
{ok, _} = py:start_contexts(),
20+
21+
%% Print system info
22+
io:format("System Information:~n"),
23+
io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]),
24+
{ok, PyVer} = py:version(),
25+
io:format(" Python: ~s~n", [PyVer]),
26+
io:format("~n"),
27+
28+
%% Run benchmarks
29+
run_buffer_operations_bench(),
30+
run_protocol_simulation_bench(),
31+
run_echo_protocol_bench(),
32+
33+
io:format("~n========================================~n"),
34+
io:format("Benchmark Complete~n"),
35+
io:format("========================================~n"),
36+
37+
halt(0).
38+
39+
run_buffer_operations_bench() ->
40+
io:format("~n--- Buffer Operations Benchmark ---~n"),
41+
io:format("Iterations: 10000~n~n"),
42+
43+
Code = <<"
44+
import time
45+
import erlang
46+
47+
def run_buffer_ops_bench(iterations=10000):
48+
results = {}
49+
sizes = [64, 256, 1024, 4096, 16384, 65536]
50+
51+
for size in sizes:
52+
test_data = b'X' * size
53+
buf = erlang.ReactorBuffer._test_create(test_data)
54+
regular_bytes = bytes(test_data)
55+
56+
# Benchmark: extend bytearray (uses buffer protocol)
57+
start = time.perf_counter()
58+
for _ in range(iterations):
59+
ba = bytearray()
60+
ba.extend(buf)
61+
extend_buf_time = time.perf_counter() - start
62+
63+
start = time.perf_counter()
64+
for _ in range(iterations):
65+
ba = bytearray()
66+
ba.extend(regular_bytes)
67+
extend_bytes_time = time.perf_counter() - start
68+
69+
# Benchmark: startswith
70+
prefix = test_data[:10]
71+
start = time.perf_counter()
72+
for _ in range(iterations):
73+
_ = buf.startswith(prefix)
74+
startswith_buf_time = time.perf_counter() - start
75+
76+
start = time.perf_counter()
77+
for _ in range(iterations):
78+
_ = regular_bytes.startswith(prefix)
79+
startswith_bytes_time = time.perf_counter() - start
80+
81+
results[size] = {
82+
'extend_buf': extend_buf_time * 1000,
83+
'extend_bytes': extend_bytes_time * 1000,
84+
'startswith_buf': startswith_buf_time * 1000,
85+
'startswith_bytes': startswith_bytes_time * 1000,
86+
}
87+
88+
return results
89+
90+
_buffer_ops_results = run_buffer_ops_bench()
91+
">>,
92+
93+
ok = py:exec(Code),
94+
{ok, Results} = py:eval(<<"_buffer_ops_results">>),
95+
96+
io:format("~8s | ~12s | ~12s | ~12s | ~8s~n",
97+
["Size", "Operation", "Buffer (ms)", "Bytes (ms)", "Ratio"]),
98+
io:format("~s~n", [string:copies("-", 60)]),
99+
100+
Sizes = [64, 256, 1024, 4096, 16384, 65536],
101+
lists:foreach(fun(Size) ->
102+
Data = maps:get(Size, Results),
103+
104+
ExtBuf = maps:get(<<"extend_buf">>, Data),
105+
ExtBytes = maps:get(<<"extend_bytes">>, Data),
106+
ExtRatio = ExtBytes / max(ExtBuf, 0.001),
107+
108+
SwBuf = maps:get(<<"startswith_buf">>, Data),
109+
SwBytes = maps:get(<<"startswith_bytes">>, Data),
110+
SwRatio = SwBytes / max(SwBuf, 0.001),
111+
112+
io:format("~8B | ~12s | ~12.3f | ~12.3f | ~.2f x~n",
113+
[Size, "extend", ExtBuf, ExtBytes, ExtRatio]),
114+
io:format("~8s | ~12s | ~12.3f | ~12.3f | ~.2f x~n",
115+
["", "startswith", SwBuf, SwBytes, SwRatio])
116+
end, Sizes),
117+
ok.
118+
119+
run_protocol_simulation_bench() ->
120+
io:format("~n--- Protocol Simulation Benchmark ---~n"),
121+
io:format("Iterations: 5000~n~n"),
122+
123+
Code = <<"
124+
import time
125+
import erlang
126+
127+
def run_protocol_sim_bench(iterations=5000):
128+
results = {}
129+
sizes = [64, 256, 1024, 4096, 16384, 65536]
130+
131+
for size in sizes:
132+
test_data = b'GET / HTTP/1.1\\r\\nHost: example.com\\r\\n\\r\\n' + b'X' * (size - 40)
133+
test_data = test_data[:size]
134+
135+
buf = erlang.ReactorBuffer._test_create(test_data)
136+
regular_bytes = bytes(test_data)
137+
138+
def parse_request(data):
139+
if data.startswith(b'GET'):
140+
method = 'GET'
141+
elif data.startswith(b'POST'):
142+
method = 'POST'
143+
else:
144+
method = 'OTHER'
145+
pos = data.find(b'\\r\\n\\r\\n')
146+
write_buf = bytearray()
147+
write_buf.extend(data)
148+
return len(write_buf)
149+
150+
# Benchmark with ReactorBuffer
151+
start = time.perf_counter()
152+
for _ in range(iterations):
153+
_ = parse_request(buf)
154+
buf_time = time.perf_counter() - start
155+
156+
# Benchmark with regular bytes
157+
start = time.perf_counter()
158+
for _ in range(iterations):
159+
_ = parse_request(regular_bytes)
160+
bytes_time = time.perf_counter() - start
161+
162+
results[size] = {
163+
'buffer_ops_per_sec': iterations / buf_time,
164+
'bytes_ops_per_sec': iterations / bytes_time,
165+
}
166+
167+
return results
168+
169+
_protocol_sim_results = run_protocol_sim_bench()
170+
">>,
171+
172+
ok = py:exec(Code),
173+
{ok, Results} = py:eval(<<"_protocol_sim_results">>),
174+
175+
io:format("~8s | ~14s | ~14s | ~8s~n",
176+
["Size", "Buffer (ops/s)", "Bytes (ops/s)", "Speedup"]),
177+
io:format("~s~n", [string:copies("-", 52)]),
178+
179+
Sizes = [64, 256, 1024, 4096, 16384, 65536],
180+
SpeedupsList = lists:map(fun(Size) ->
181+
Data = maps:get(Size, Results),
182+
183+
BufOps = maps:get(<<"buffer_ops_per_sec">>, Data),
184+
BytesOps = maps:get(<<"bytes_ops_per_sec">>, Data),
185+
Speedup = BufOps / max(BytesOps, 1),
186+
187+
io:format("~8B | ~14w | ~14w | ~.2f~n",
188+
[Size, round(BufOps), round(BytesOps), Speedup]),
189+
{Size, Speedup}
190+
end, Sizes),
191+
192+
%% Calculate average speedup for >= 1KB
193+
LargeSpeedups = [S || {Size, S} <- SpeedupsList, Size >= 1024],
194+
case LargeSpeedups of
195+
[] -> ok;
196+
_ ->
197+
AvgSpeedup = lists:sum(LargeSpeedups) / length(LargeSpeedups),
198+
Improvement = (AvgSpeedup - 1.0) * 100,
199+
io:format("~nAverage speedup for payloads >= 1KB: ~.2f x~n", [AvgSpeedup]),
200+
io:format("Performance improvement: ~.1f%~n", [Improvement])
201+
end,
202+
ok.
203+
204+
run_echo_protocol_bench() ->
205+
io:format("~n--- Echo Protocol Benchmark ---~n"),
206+
io:format("Iterations: 200~n~n"),
207+
208+
Code = <<"
209+
import time
210+
import socket
211+
import statistics
212+
import erlang.reactor as reactor
213+
214+
def run_echo_bench(iterations=200):
215+
class EchoProtocol(reactor.Protocol):
216+
def data_received(self, data):
217+
self.write_buffer.extend(data)
218+
return 'write_pending'
219+
220+
def write_ready(self):
221+
if not self.write_buffer:
222+
return 'read_pending'
223+
written = self.write(bytes(self.write_buffer))
224+
del self.write_buffer[:written]
225+
return 'continue' if self.write_buffer else 'read_pending'
226+
227+
results = {}
228+
sizes = [64, 256, 1024, 4096, 16384]
229+
230+
for size in sizes:
231+
test_data = b'X' * size
232+
times = []
233+
234+
for _ in range(iterations):
235+
s1, s2 = socket.socketpair()
236+
s1.setblocking(False)
237+
s2.setblocking(False)
238+
239+
try:
240+
reactor.set_protocol_factory(EchoProtocol)
241+
reactor.init_connection(s1.fileno(), {'type': 'test'})
242+
243+
s2.send(test_data)
244+
245+
start = time.perf_counter()
246+
action = reactor.on_read_ready(s1.fileno())
247+
elapsed = time.perf_counter() - start
248+
times.append(elapsed)
249+
250+
reactor.close_connection(s1.fileno())
251+
finally:
252+
s1.close()
253+
s2.close()
254+
255+
avg_time = statistics.mean(times)
256+
results[size] = {
257+
'avg_time_ms': avg_time * 1000,
258+
'ops_per_sec': 1.0 / avg_time,
259+
'p50_ms': statistics.median(times) * 1000,
260+
'p95_ms': sorted(times)[int(len(times) * 0.95)] * 1000,
261+
}
262+
263+
return results
264+
265+
_echo_bench_results = run_echo_bench()
266+
">>,
267+
268+
ok = py:exec(Code),
269+
{ok, Results} = py:eval(<<"_echo_bench_results">>),
270+
271+
io:format("~8s | ~10s | ~10s | ~10s | ~10s~n",
272+
["Size", "Avg (ms)", "P50 (ms)", "P95 (ms)", "Ops/sec"]),
273+
io:format("~s~n", [string:copies("-", 56)]),
274+
275+
Sizes = [64, 256, 1024, 4096, 16384],
276+
lists:foreach(fun(Size) ->
277+
Data = maps:get(Size, Results),
278+
279+
AvgMs = maps:get(<<"avg_time_ms">>, Data),
280+
P50Ms = maps:get(<<"p50_ms">>, Data),
281+
P95Ms = maps:get(<<"p95_ms">>, Data),
282+
OpsPerSec = maps:get(<<"ops_per_sec">>, Data),
283+
284+
io:format("~8B | ~10.3f | ~10.3f | ~10.3f | ~10w~n",
285+
[Size, AvgMs, P50Ms, P95Ms, round(OpsPerSec)])
286+
end, Sizes),
287+
ok.

0 commit comments

Comments
 (0)