Skip to content

Commit d6a836c

Browse files
committed
Add cached memoryview and memoryview() method to ReactorBuffer
1 parent 311de62 commit d6a836c

4 files changed

Lines changed: 341 additions & 4 deletions

File tree

c_src/py_reactor_buffer.c

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,30 @@ static PyBufferProcs ReactorBuffer_as_buffer = {
143143
.bf_releasebuffer = ReactorBuffer_releasebuffer,
144144
};
145145

146+
/* ============================================================================
147+
* Cached Memoryview Helper
148+
* ============================================================================ */
149+
150+
/**
151+
* Get or create the cached memoryview for fast buffer operations.
152+
* The memoryview is created lazily and cached for subsequent accesses.
153+
*/
154+
static PyObject *ReactorBuffer_get_memoryview(ReactorBufferObject *self) {
155+
if (self->cached_memoryview == NULL) {
156+
if (self->resource == NULL || self->resource->data == NULL) {
157+
PyErr_SetString(PyExc_BufferError, "Buffer has been released");
158+
return NULL;
159+
}
160+
/* Create memoryview from self (uses our buffer protocol) */
161+
self->cached_memoryview = PyMemoryView_FromObject((PyObject *)self);
162+
if (self->cached_memoryview == NULL) {
163+
return NULL;
164+
}
165+
}
166+
Py_INCREF(self->cached_memoryview);
167+
return self->cached_memoryview;
168+
}
169+
146170
/* ============================================================================
147171
* Python Sequence Protocol
148172
* ============================================================================ */
@@ -230,6 +254,7 @@ static PyMappingMethods ReactorBuffer_as_mapping = {
230254
* ============================================================================ */
231255

232256
static void ReactorBuffer_dealloc(ReactorBufferObject *self) {
257+
Py_CLEAR(self->cached_memoryview);
233258
if (self->resource_ref != NULL) {
234259
enif_release_resource(self->resource_ref);
235260
self->resource_ref = NULL;
@@ -246,6 +271,11 @@ static PyObject *ReactorBuffer_bytes(ReactorBufferObject *self, PyObject *Py_UNU
246271
self->resource->size);
247272
}
248273

274+
/* Return memoryview for zero-copy access */
275+
static PyObject *ReactorBuffer_memoryview(ReactorBufferObject *self, PyObject *Py_UNUSED(ignored)) {
276+
return ReactorBuffer_get_memoryview(self);
277+
}
278+
249279
static PyObject *ReactorBuffer_repr(ReactorBufferObject *self) {
250280
if (self->resource == NULL) {
251281
return PyUnicode_FromString("<ReactorBuffer (released)>");
@@ -663,6 +693,8 @@ static PyObject *ReactorBuffer_test_create(PyTypeObject *type, PyObject *args) {
663693
static PyMethodDef ReactorBuffer_methods[] = {
664694
{"__bytes__", (PyCFunction)ReactorBuffer_bytes, METH_NOARGS,
665695
"Return bytes copy of buffer"},
696+
{"memoryview", (PyCFunction)ReactorBuffer_memoryview, METH_NOARGS,
697+
"Return a memoryview for zero-copy access"},
666698
{"startswith", (PyCFunction)ReactorBuffer_startswith, METH_VARARGS,
667699
"Return True if buffer starts with prefix"},
668700
{"endswith", (PyCFunction)ReactorBuffer_endswith, METH_VARARGS,
@@ -724,6 +756,7 @@ PyObject *ReactorBuffer_from_resource(reactor_buffer_resource_t *resource,
724756

725757
obj->resource = resource;
726758
obj->resource_ref = resource_ref;
759+
obj->cached_memoryview = NULL; /* Created lazily on first use */
727760
enif_keep_resource(resource_ref);
728761

729762
return (PyObject *)obj;

c_src/py_reactor_buffer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,15 @@ extern PyTypeObject ReactorBufferType;
9191
/**
9292
* @struct ReactorBufferObject
9393
* @brief Python object wrapping a reactor buffer resource
94+
*
95+
* Uses a cached memoryview internally for optimal buffer protocol performance.
96+
* The memoryview is created lazily on first buffer access.
9497
*/
9598
typedef struct {
9699
PyObject_HEAD
97100
reactor_buffer_resource_t *resource; /**< NIF resource (we hold a reference) */
98101
void *resource_ref; /**< For releasing the resource */
102+
PyObject *cached_memoryview; /**< Cached memoryview for fast buffer access */
99103
} ReactorBufferObject;
100104

101105
/* ============================================================================

docs/reactor.md

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,19 @@ def data_received(self, data):
226226

227227
### Performance Considerations
228228

229-
- For small reads (<1KB), the overhead of buffer management may exceed benefits
230-
- For large reads (>=1KB), zero-copy provides 15-25% throughput improvement
231-
- Use `memoryview()` for parsing without copying
232-
- Call `bytes(data)` only when you need a persistent copy
229+
The zero-copy benefit is in the NIF read path - data is read directly into a buffer that Python wraps without copying. This avoids the overhead of creating a Python bytes object for every read.
230+
231+
- **NIF read path**: Data goes directly from kernel to Python without intermediate copies
232+
- **Parsing operations**: `startswith()`, `find()` etc. are optimized C implementations
233+
- **Direct memoryview access**: Use `data.memoryview()` for maximum zero-copy performance
234+
- **Creating bytes**: Call `bytes(data)` only when you need a persistent copy
235+
236+
```python
237+
# For maximum performance, use memoryview slicing for comparisons
238+
mv = data.memoryview()
239+
if mv[:3] == b'GET':
240+
# Process GET request
241+
```
233242

234243
## Action Return Values
235244

examples/bench_reactor_buffer.py

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
#!/usr/bin/env python3
2+
"""Benchmark for ReactorBuffer zero-copy performance.
3+
4+
Compares:
5+
1. ReactorBuffer with zero-copy access (buffer protocol)
6+
2. Regular bytes with data copying
7+
8+
Run: python3 examples/bench_reactor_buffer.py
9+
"""
10+
11+
import time
12+
import socket
13+
import statistics
14+
import sys
15+
16+
sys.path.insert(0, 'priv')
17+
18+
19+
def bench_buffer_operations(iterations=10000):
20+
"""Benchmark common buffer operations."""
21+
import erlang
22+
23+
results = {}
24+
25+
# Test data sizes
26+
sizes = [64, 256, 1024, 4096, 16384, 65536]
27+
28+
for size in sizes:
29+
test_data = b'X' * size
30+
31+
# Create ReactorBuffer
32+
buf = erlang.ReactorBuffer._test_create(test_data)
33+
regular_bytes = bytes(test_data)
34+
35+
# Benchmark: memoryview access (zero-copy)
36+
start = time.perf_counter()
37+
for _ in range(iterations):
38+
mv = memoryview(buf)
39+
_ = mv[0]
40+
mv_time = time.perf_counter() - start
41+
42+
# Benchmark: memoryview on regular bytes
43+
start = time.perf_counter()
44+
for _ in range(iterations):
45+
mv = memoryview(regular_bytes)
46+
_ = mv[0]
47+
mv_bytes_time = time.perf_counter() - start
48+
49+
# Benchmark: extend bytearray (uses buffer protocol)
50+
start = time.perf_counter()
51+
for _ in range(iterations):
52+
ba = bytearray()
53+
ba.extend(buf)
54+
extend_buf_time = time.perf_counter() - start
55+
56+
# Benchmark: extend bytearray from bytes
57+
start = time.perf_counter()
58+
for _ in range(iterations):
59+
ba = bytearray()
60+
ba.extend(regular_bytes)
61+
extend_bytes_time = time.perf_counter() - start
62+
63+
# Benchmark: slice operation
64+
slice_len = min(100, size)
65+
start = time.perf_counter()
66+
for _ in range(iterations):
67+
_ = buf[0:slice_len]
68+
slice_buf_time = time.perf_counter() - start
69+
70+
start = time.perf_counter()
71+
for _ in range(iterations):
72+
_ = regular_bytes[0:slice_len]
73+
slice_bytes_time = time.perf_counter() - start
74+
75+
# Benchmark: startswith
76+
prefix = test_data[:10]
77+
start = time.perf_counter()
78+
for _ in range(iterations):
79+
_ = buf.startswith(prefix)
80+
startswith_buf_time = time.perf_counter() - start
81+
82+
start = time.perf_counter()
83+
for _ in range(iterations):
84+
_ = regular_bytes.startswith(prefix)
85+
startswith_bytes_time = time.perf_counter() - start
86+
87+
results[size] = {
88+
'memoryview': {'buffer': mv_time, 'bytes': mv_bytes_time},
89+
'extend': {'buffer': extend_buf_time, 'bytes': extend_bytes_time},
90+
'slice': {'buffer': slice_buf_time, 'bytes': slice_bytes_time},
91+
'startswith': {'buffer': startswith_buf_time, 'bytes': startswith_bytes_time},
92+
}
93+
94+
return results
95+
96+
97+
def bench_protocol_simulation(iterations=1000):
98+
"""Simulate protocol data_received with different payload sizes."""
99+
import erlang
100+
101+
results = {}
102+
sizes = [64, 256, 1024, 4096, 16384, 65536]
103+
104+
for size in sizes:
105+
test_data = b'GET / HTTP/1.1\r\nHost: example.com\r\n\r\n' + b'X' * (size - 40)
106+
test_data = test_data[:size] # Ensure exact size
107+
108+
buf = erlang.ReactorBuffer._test_create(test_data)
109+
regular_bytes = bytes(test_data)
110+
111+
# Simulate typical protocol parsing with ReactorBuffer
112+
def parse_with_buffer(data):
113+
# Check method
114+
if data.startswith(b'GET'):
115+
method = 'GET'
116+
elif data.startswith(b'POST'):
117+
method = 'POST'
118+
else:
119+
method = 'OTHER'
120+
121+
# Find header end
122+
pos = data.find(b'\r\n\r\n')
123+
124+
# Buffer to write buffer
125+
write_buf = bytearray()
126+
write_buf.extend(data)
127+
128+
return len(write_buf)
129+
130+
# Benchmark with ReactorBuffer
131+
start = time.perf_counter()
132+
for _ in range(iterations):
133+
_ = parse_with_buffer(buf)
134+
buf_time = time.perf_counter() - start
135+
136+
# Benchmark with regular bytes
137+
start = time.perf_counter()
138+
for _ in range(iterations):
139+
_ = parse_with_buffer(regular_bytes)
140+
bytes_time = time.perf_counter() - start
141+
142+
results[size] = {
143+
'buffer_time': buf_time,
144+
'bytes_time': bytes_time,
145+
'ops_per_sec_buffer': iterations / buf_time,
146+
'ops_per_sec_bytes': iterations / bytes_time,
147+
}
148+
149+
return results
150+
151+
152+
def bench_echo_protocol(iterations=500):
153+
"""Benchmark echo protocol pattern with socketpair."""
154+
import erlang.reactor as reactor
155+
156+
results = {}
157+
sizes = [64, 256, 1024, 4096, 16384]
158+
159+
class EchoProtocol(reactor.Protocol):
160+
def data_received(self, data):
161+
self.write_buffer.extend(data)
162+
return 'write_pending'
163+
164+
def write_ready(self):
165+
if not self.write_buffer:
166+
return 'read_pending'
167+
written = self.write(bytes(self.write_buffer))
168+
del self.write_buffer[:written]
169+
return 'continue' if self.write_buffer else 'read_pending'
170+
171+
for size in sizes:
172+
test_data = b'X' * size
173+
times = []
174+
175+
for _ in range(iterations):
176+
s1, s2 = socket.socketpair()
177+
s1.setblocking(False)
178+
s2.setblocking(False)
179+
180+
try:
181+
reactor.set_protocol_factory(EchoProtocol)
182+
reactor.init_connection(s1.fileno(), {'type': 'test'})
183+
184+
s2.send(test_data)
185+
186+
start = time.perf_counter()
187+
action = reactor.on_read_ready(s1.fileno())
188+
elapsed = time.perf_counter() - start
189+
times.append(elapsed)
190+
191+
reactor.close_connection(s1.fileno())
192+
finally:
193+
s1.close()
194+
s2.close()
195+
196+
avg_time = statistics.mean(times)
197+
results[size] = {
198+
'avg_time_ms': avg_time * 1000,
199+
'ops_per_sec': 1.0 / avg_time,
200+
'p50_ms': statistics.median(times) * 1000,
201+
'p95_ms': sorted(times)[int(len(times) * 0.95)] * 1000,
202+
}
203+
204+
return results
205+
206+
207+
def format_results(name, results):
208+
"""Format benchmark results."""
209+
print(f"\n{'='*60}")
210+
print(f" {name}")
211+
print(f"{'='*60}")
212+
213+
if 'memoryview' in list(results.values())[0]:
214+
# Buffer operations format
215+
print(f"{'Size':>8} | {'Operation':>12} | {'Buffer (ms)':>12} | {'Bytes (ms)':>12} | {'Ratio':>8}")
216+
print("-" * 60)
217+
for size, ops in sorted(results.items()):
218+
for op_name, times in ops.items():
219+
buf_ms = times['buffer'] * 1000
220+
bytes_ms = times['bytes'] * 1000
221+
ratio = bytes_ms / buf_ms if buf_ms > 0 else 0
222+
print(f"{size:>8} | {op_name:>12} | {buf_ms:>12.3f} | {bytes_ms:>12.3f} | {ratio:>7.2f}x")
223+
elif 'buffer_time' in list(results.values())[0]:
224+
# Protocol simulation format
225+
print(f"{'Size':>8} | {'Buffer (ops/s)':>14} | {'Bytes (ops/s)':>14} | {'Speedup':>8}")
226+
print("-" * 60)
227+
for size, data in sorted(results.items()):
228+
buf_ops = data['ops_per_sec_buffer']
229+
bytes_ops = data['ops_per_sec_bytes']
230+
speedup = buf_ops / bytes_ops if bytes_ops > 0 else 0
231+
print(f"{size:>8} | {buf_ops:>14.0f} | {bytes_ops:>14.0f} | {speedup:>7.2f}x")
232+
else:
233+
# Echo protocol format
234+
print(f"{'Size':>8} | {'Avg (ms)':>10} | {'P50 (ms)':>10} | {'P95 (ms)':>10} | {'Ops/sec':>10}")
235+
print("-" * 60)
236+
for size, data in sorted(results.items()):
237+
print(f"{size:>8} | {data['avg_time_ms']:>10.3f} | {data['p50_ms']:>10.3f} | {data['p95_ms']:>10.3f} | {data['ops_per_sec']:>10.0f}")
238+
239+
240+
def main():
241+
print("\n" + "=" * 60)
242+
print(" ReactorBuffer Zero-Copy Benchmark")
243+
print("=" * 60)
244+
245+
# Check if ReactorBuffer is available
246+
try:
247+
import erlang
248+
buf = erlang.ReactorBuffer._test_create(b'test')
249+
print(f"ReactorBuffer available: {type(buf).__name__}")
250+
except Exception as e:
251+
print(f"ERROR: ReactorBuffer not available: {e}")
252+
sys.exit(1)
253+
254+
print("\nRunning benchmarks...")
255+
256+
# Run benchmarks
257+
print("\n[1/3] Buffer operations benchmark...")
258+
buffer_ops = bench_buffer_operations(iterations=10000)
259+
format_results("Buffer Operations (10000 iterations)", buffer_ops)
260+
261+
print("\n[2/3] Protocol simulation benchmark...")
262+
protocol_sim = bench_protocol_simulation(iterations=5000)
263+
format_results("Protocol Simulation (5000 iterations)", protocol_sim)
264+
265+
print("\n[3/3] Echo protocol benchmark...")
266+
echo_proto = bench_echo_protocol(iterations=200)
267+
format_results("Echo Protocol (200 iterations)", echo_proto)
268+
269+
# Summary
270+
print("\n" + "=" * 60)
271+
print(" Summary")
272+
print("=" * 60)
273+
274+
# Calculate average speedup for large payloads (>= 1KB)
275+
large_payload_speedups = []
276+
for size, data in protocol_sim.items():
277+
if size >= 1024:
278+
speedup = data['ops_per_sec_buffer'] / data['ops_per_sec_bytes']
279+
large_payload_speedups.append(speedup)
280+
281+
if large_payload_speedups:
282+
avg_speedup = statistics.mean(large_payload_speedups)
283+
print(f"Average speedup for payloads >= 1KB: {avg_speedup:.2f}x")
284+
improvement = (avg_speedup - 1.0) * 100
285+
print(f"Performance improvement: {improvement:+.1f}%")
286+
287+
print("\nDone.")
288+
289+
290+
if __name__ == '__main__':
291+
main()

0 commit comments

Comments
 (0)