Skip to content

Commit d100f77

Browse files
committed
Add reactor.signal_write_ready(fd) API
Provides a clean API for protocols to signal async completion instead of manually sending to the reactor PID. The reactor PID is stored per-fd in init_connection and cleaned up in close_connection.
1 parent 4321817 commit d100f77

2 files changed

Lines changed: 29 additions & 17 deletions

File tree

priv/_erlang_impl/_reactor.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def write_ready(self):
4747
'on_read_ready',
4848
'on_write_ready',
4949
'close_connection',
50+
'signal_write_ready',
5051
]
5152

5253

@@ -153,6 +154,7 @@ def write(self, data: bytes) -> int:
153154
# =============================================================================
154155

155156
_protocols: Dict[int, Protocol] = {}
157+
_reactor_pids: Dict[int, object] = {} # fd -> reactor PID
156158
_protocol_factory: Optional[Callable[[], Protocol]] = None
157159

158160

@@ -193,11 +195,15 @@ def init_connection(fd: int, client_info: dict):
193195
fd: File descriptor
194196
client_info: Connection metadata from Erlang
195197
"""
196-
global _protocols, _protocol_factory
198+
global _protocols, _protocol_factory, _reactor_pids
197199
if _protocol_factory is not None:
198200
proto = _protocol_factory()
199201
proto.connection_made(fd, client_info)
200202
_protocols[fd] = proto
203+
# Store reactor PID for signal_write_ready
204+
reactor_pid = client_info.get('reactor_pid')
205+
if reactor_pid is not None:
206+
_reactor_pids[fd] = reactor_pid
201207

202208

203209
def on_read_ready(fd: int) -> str:
@@ -246,6 +252,27 @@ def close_connection(fd: int):
246252
fd: File descriptor
247253
"""
248254
proto = _protocols.pop(fd, None)
255+
_reactor_pids.pop(fd, None)
249256
if proto is not None:
250257
proto.closed = True
251258
proto.connection_lost()
259+
260+
261+
def signal_write_ready(fd: int) -> bool:
262+
"""Signal the reactor that a response is ready for the given fd.
263+
264+
Call this after an async task completes and the response buffer is ready.
265+
The reactor will then trigger write selection for the fd.
266+
267+
Args:
268+
fd: File descriptor with pending response
269+
270+
Returns:
271+
True if signal was sent, False if no reactor PID registered
272+
"""
273+
import erlang
274+
reactor_pid = _reactor_pids.get(fd)
275+
if reactor_pid is not None:
276+
erlang.send(reactor_pid, ('write_ready', fd))
277+
return True
278+
return False

test/py_reactor_SUITE.erl

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -215,35 +215,20 @@ _close_test_result = run_close_test()
215215
async_pending_test(_Config) ->
216216
%% Protocol factory code to run in reactor context
217217
SetupCode = <<"
218-
import erlang
219218
import erlang.reactor as reactor
220219
221220
class AsyncPendingProtocol(reactor.Protocol):
222221
'''Protocol that returns async_pending and signals completion.'''
223222
224223
def __init__(self):
225224
super().__init__()
226-
self.reactor_pid = None
227225
self.pending_response = b''
228226
229-
def connection_made(self, fd, client_info):
230-
super().connection_made(fd, client_info)
231-
self.reactor_pid = client_info.get('reactor_pid')
232-
233227
def data_received(self, data):
234-
import sys
235228
self.pending_response = b'ASYNC:' + data
236229
# Immediately complete the task and signal reactor
237230
self.write_buffer.extend(self.pending_response)
238-
if self.reactor_pid:
239-
print(f'Sending write_ready to {self.reactor_pid} for fd={self.fd}', file=sys.stderr)
240-
try:
241-
erlang.send(self.reactor_pid, ('write_ready', self.fd))
242-
print('write_ready sent successfully', file=sys.stderr)
243-
except Exception as e:
244-
print(f'erlang.send failed: {e}', file=sys.stderr)
245-
else:
246-
print('No reactor_pid available!', file=sys.stderr)
231+
reactor.signal_write_ready(self.fd)
247232
return 'async_pending'
248233
249234
def write_ready(self):

0 commit comments

Comments
 (0)