Skip to content

Commit e12ebe4

Browse files
authored
feat(zeromq): Allow external zmq.asyncio.Context injection (#43)
1 parent a127021 commit e12ebe4

4 files changed

Lines changed: 173 additions & 2 deletions

File tree

CLAUDE.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Claude Code Guide for Callosum
2+
3+
## Project Overview
4+
5+
Callosum is an asyncio-based RPC library for Python that supports multiple transport backends (ZeroMQ, Redis, Thrift).
6+
7+
## Development Environment
8+
9+
This project uses **uv** as the package manager.
10+
11+
### Setup
12+
13+
```bash
14+
uv sync --extra zeromq --extra redis --extra thrift
15+
```
16+
17+
### Running Tests
18+
19+
```bash
20+
# Run all tests
21+
uv run pytest
22+
23+
# Run specific test file
24+
uv run pytest tests/test_rpc.py -v
25+
26+
# Run specific test
27+
uv run pytest tests/test_rpc.py::test_external_context_injection -v
28+
```
29+
30+
### Type Checking
31+
32+
```bash
33+
uv run mypy src/callosum
34+
```
35+
36+
### Linting
37+
38+
```bash
39+
uv run ruff check src/
40+
uv run ruff format src/
41+
```
42+
43+
## Project Structure
44+
45+
```
46+
src/callosum/
47+
├── lower/ # Transport layer (zeromq, redis)
48+
│ ├── __init__.py # BaseTransport, AbstractBinder, AbstractConnector
49+
│ ├── zeromq.py # ZeroMQ transport implementation
50+
│ └── redis.py # Redis transport implementation
51+
├── rpc/ # RPC layer
52+
│ ├── channel.py # Peer class (main RPC interface)
53+
│ ├── message.py # RPC message types
54+
│ └── exceptions.py
55+
├── auth.py # Authentication (CURVE for ZeroMQ)
56+
├── ordering.py # Async schedulers
57+
└── serial.py # Serialization utilities
58+
```
59+
60+
## Key Classes
61+
62+
- `Peer` (rpc/channel.py): Main RPC interface for both client and server
63+
- `ZeroMQBaseTransport` (lower/zeromq.py): ZeroMQ transport with context management
64+
- `ZeroMQRPCTransport`: RPC-specific transport using ROUTER/DEALER sockets
65+
66+
## Optional Dependencies
67+
68+
Defined in `pyproject.toml` under `[project.optional-dependencies]`:
69+
- `zeromq`: pyzmq for ZeroMQ transport
70+
- `redis`: redis-py for Redis transport
71+
- `thrift`: thriftpy2 for Thrift serialization
72+
- `snappy`: python-snappy for compression

changes/43.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
lower.zeromq: Add support for external `zmq.asyncio.Context` injection via `transport_opts["zctx"]` to allow sharing a single context across multiple Peer instances

src/callosum/lower/zeromq.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ class ZeroMQBaseTransport(BaseTransport):
482482

483483
__slots__ = BaseTransport.__slots__ + (
484484
"_zctx",
485+
"_external_zctx",
485486
"_zsock_opts",
486487
"_zap_server",
487488
"_zap_task",
@@ -501,7 +502,9 @@ def __init__(
501502
super().__init__(authenticator, **kwargs)
502503
self._zap_server = None
503504
self._zap_task = None
504-
self._zctx = zmq.asyncio.Context()
505+
# Support external context injection via transport_opts["zctx"]
506+
self._external_zctx = self.transport_opts.get("zctx") is not None
507+
self._zctx = self.transport_opts.get("zctx") or zmq.asyncio.Context()
505508
match self.authenticator:
506509
case AbstractServerAuthenticator() as auth:
507510
self._zap_server = ZAPServer(self._zctx, auth)
@@ -524,7 +527,8 @@ async def close(self) -> None:
524527
self._zap_task.cancel()
525528
with contextlib.suppress(asyncio.CancelledError):
526529
await self._zap_task
527-
if self._zctx is not None:
530+
# Do not destroy externally injected context
531+
if self._zctx is not None and not self._external_zctx:
528532
self._zctx.destroy(linger=50)
529533

530534

tests/test_rpc.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,3 +295,97 @@ async def _do_request(idx: int) -> int:
295295
assert e.args[0] == "ZeroDivisionError"
296296
else:
297297
assert call_results[idx] == idx
298+
299+
300+
@pytest.mark.asyncio
301+
async def test_external_context_injection() -> None:
302+
"""Test that an externally injected zmq context is used and not destroyed on close."""
303+
import zmq.asyncio
304+
305+
# Create an external context
306+
external_zctx = zmq.asyncio.Context()
307+
308+
async def func(request: RPCMessage) -> str:
309+
return "ok"
310+
311+
# Create server with external context
312+
server = Peer(
313+
bind=ZeroMQAddress("tcp://127.0.0.1:5021"),
314+
transport=ZeroMQRPCTransport,
315+
transport_opts={"zctx": external_zctx},
316+
scheduler=ExitOrderedAsyncScheduler(),
317+
serializer=lambda o: json.dumps(o).encode("utf8"),
318+
deserializer=lambda b: json.loads(b),
319+
)
320+
server.handle_function("func", func)
321+
322+
# Create client with external context
323+
client = Peer(
324+
connect=ZeroMQAddress("tcp://localhost:5021"),
325+
transport=ZeroMQRPCTransport,
326+
transport_opts={"zctx": external_zctx},
327+
serializer=lambda o: json.dumps(o).encode("utf8"),
328+
deserializer=lambda b: json.loads(b),
329+
)
330+
331+
# Verify the external context is used
332+
server_transport = cast(ZeroMQRPCTransport, server._transport)
333+
client_transport = cast(ZeroMQRPCTransport, client._transport)
334+
assert server_transport._zctx is external_zctx
335+
assert server_transport._external_zctx is True
336+
assert client_transport._zctx is external_zctx
337+
assert client_transport._external_zctx is True
338+
339+
async with server:
340+
async with client:
341+
result = await client.invoke("func", {})
342+
assert result == "ok"
343+
344+
# Verify context is NOT destroyed after transport close
345+
assert not external_zctx.closed
346+
347+
# Clean up
348+
external_zctx.destroy(linger=0)
349+
350+
351+
@pytest.mark.asyncio
352+
async def test_internal_context_destroyed_on_close() -> None:
353+
"""Test that internally created zmq context is destroyed on close."""
354+
355+
async def func(request: RPCMessage) -> str:
356+
return "ok"
357+
358+
# Create server without external context (uses internal)
359+
server = Peer(
360+
bind=ZeroMQAddress("tcp://127.0.0.1:5022"),
361+
transport=ZeroMQRPCTransport,
362+
scheduler=ExitOrderedAsyncScheduler(),
363+
serializer=lambda o: json.dumps(o).encode("utf8"),
364+
deserializer=lambda b: json.loads(b),
365+
)
366+
server.handle_function("func", func)
367+
368+
# Create client without external context (uses internal)
369+
client = Peer(
370+
connect=ZeroMQAddress("tcp://localhost:5022"),
371+
transport=ZeroMQRPCTransport,
372+
serializer=lambda o: json.dumps(o).encode("utf8"),
373+
deserializer=lambda b: json.loads(b),
374+
)
375+
376+
# Verify internal context is created
377+
server_transport = cast(ZeroMQRPCTransport, server._transport)
378+
client_transport = cast(ZeroMQRPCTransport, client._transport)
379+
assert server_transport._external_zctx is False
380+
assert client_transport._external_zctx is False
381+
server_zctx = server_transport._zctx
382+
client_zctx = client_transport._zctx
383+
384+
async with server:
385+
async with client:
386+
result = await client.invoke("func", {})
387+
assert result == "ok"
388+
389+
# Verify context IS destroyed after transport close
390+
assert server_zctx.closed
391+
assert client_zctx.closed

0 commit comments

Comments
 (0)