diff --git a/setup.py b/setup.py index a1dad09e..97a27bf1 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ def read(*names, **kwargs): optional_requirements = { "dev": dev_requirements, + "zeromq": ["pyzmq>=19.0"], } setup( diff --git a/src/compas_eve/zeromq/__init__.py b/src/compas_eve/zeromq/__init__.py new file mode 100644 index 00000000..5ecc86ff --- /dev/null +++ b/src/compas_eve/zeromq/__init__.py @@ -0,0 +1,255 @@ +""" +******************************************************************************** +compas_eve.zeromq +******************************************************************************** + +.. currentmodule:: compas_eve.zeromq + + +Classes +======= + +.. autosummary:: + :toctree: generated/ + :nosignatures: + + ZeroMQTransport + +""" + +from ..core import Transport +from ..event_emitter import EventEmitterMixin + +try: + import zmq +except ImportError: + zmq = None + +__all__ = ["ZeroMQTransport"] + + +class ZeroMQTransport(Transport, EventEmitterMixin): + """ZeroMQ transport allows sending and receiving messages using ZeroMQ pub/sub sockets. + + Parameters + ---------- + endpoint : str + Endpoint for the pub/sub communication, e.g. ``tcp://localhost:5555`` or ``inproc://test``. + bind_subscriber : bool, optional + If True, the subscriber socket will bind to the endpoint and publisher will connect. + If False, the publisher will bind to the endpoint and subscriber will connect. + Defaults to True for most use cases. + """ + + def __init__(self, endpoint, bind_subscriber=True, *args, **kwargs): + if zmq is None: + raise ImportError("pyzmq is required for ZeroMQ transport. Please install it with: pip install pyzmq or conda install pyzmq") + + super(ZeroMQTransport, self).__init__(*args, **kwargs) + + self.endpoint = endpoint + self.bind_subscriber = bind_subscriber + self._is_connected = False + self._local_callbacks = {} + + # Create ZeroMQ context and sockets + self.context = zmq.Context() + self.pub_socket = self.context.socket(zmq.PUB) + self.sub_socket = self.context.socket(zmq.SUB) + + # Configure sockets based on bind_subscriber setting + if self.bind_subscriber: + # Subscriber binds, publisher connects - good for many publishers, few subscribers + self.sub_socket.bind(self.endpoint) + self.pub_socket.connect(self.endpoint) + else: + # Publisher binds, subscriber connects - good for one publisher, many subscribers + self.pub_socket.bind(self.endpoint) + self.sub_socket.connect(self.endpoint) + + # Set up polling for subscriber + self.poller = zmq.Poller() + self.poller.register(self.sub_socket, zmq.POLLIN) + + # Mark as connected (ZeroMQ doesn't have explicit connection state) + self._is_connected = True + + # Start polling thread for incoming messages + import threading + self._polling = True + self._poll_thread = threading.Thread(target=self._poll_messages) + self._poll_thread.daemon = True + self._poll_thread.start() + + # Emit ready event + self.emit("ready") + + def close(self): + """Close the ZeroMQ sockets and context.""" + self._polling = False + if hasattr(self, '_poll_thread'): + self._poll_thread.join(timeout=1) + + self.pub_socket.close() + self.sub_socket.close() + self.context.term() + + def _poll_messages(self): + """Poll for incoming messages in a separate thread.""" + while self._polling: + try: + # Poll with timeout to allow thread termination + socks = dict(self.poller.poll(100)) # 100ms timeout + if self.sub_socket in socks: + # Receive topic and message + topic_bytes = self.sub_socket.recv(zmq.NOBLOCK) + message_bytes = self.sub_socket.recv(zmq.NOBLOCK) + + topic_name = topic_bytes.decode('utf-8') + message_str = message_bytes.decode('utf-8') + + # Emit the message event + event_key = "event:{}".format(topic_name) + self.emit(event_key, message_str) + + except zmq.Again: + # No message available, continue polling + continue + except Exception as e: + # Emit error but continue polling + self.emit("error", e) + + def on_ready(self, callback): + """Allows to hook-up to the event triggered when the transport is ready. + + Parameters + ---------- + callback : function + Function to invoke when the connection is established. + """ + if self._is_connected: + callback() + else: + self.once("ready", callback) + + def publish(self, topic, message): + """Publish a message to a topic. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to publish to. + message : :class:`Message` + Instance of the message to publish. + """ + def _callback(**kwargs): + json_message = topic._message_to_json(message) + + # Send topic and message as separate frames + self.pub_socket.send_string(topic.name, zmq.SNDMORE) + self.pub_socket.send_string(json_message) + + self.on_ready(_callback) + + def subscribe(self, topic, callback): + """Subscribe to a topic. + + Every time a new message is received on the topic, the callback will be invoked. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to subscribe to. + callback : function + Callback to invoke whenever a new message arrives. The callback should + receive only one `msg` argument, e.g. ``lambda msg: print(msg)``. + + Returns + ------- + str + Returns an identifier of the subscription. + """ + event_key = "event:{}".format(topic.name) + subscribe_id = "{}:{}".format(event_key, id(callback)) + + def _local_callback(message_str): + msg = topic._message_from_json(message_str) + callback(msg) + + def _subscribe_callback(**kwargs): + # Subscribe to the topic on ZeroMQ socket + self.sub_socket.setsockopt_string(zmq.SUBSCRIBE, topic.name) + + # Register local callback for this topic + self.on(event_key, _local_callback) + + self._local_callbacks[subscribe_id] = _local_callback + + self.on_ready(_subscribe_callback) + + return subscribe_id + + def advertise(self, topic): + """Announce this code will publish messages to the specified topic. + + This call has no effect on this transport implementation. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to advertise. + + Returns + ------- + str + Advertising identifier. + """ + advertise_id = "advertise:{}:{}".format(topic.name, self.id_counter) + # ZeroMQ does not need explicit advertising + return advertise_id + + def unadvertise(self, topic): + """Announce that this code will stop publishing messages to the specified topic. + + This call has no effect on this transport implementation. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to stop publishing messages to. + """ + pass + + def unsubscribe_by_id(self, subscribe_id): + """Unsubscribe from the specified topic based on the subscription id. + + Parameters + ---------- + subscribe_id : str + Identifier of the subscription. + """ + ev_type, topic_name, _callback_id = subscribe_id.split(":") + event_key = "{}:{}".format(ev_type, topic_name) + + callback = self._local_callbacks[subscribe_id] + self.off(event_key, callback) + + # Unsubscribe from ZeroMQ socket + self.sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, topic_name) + + del self._local_callbacks[subscribe_id] + + def unsubscribe(self, topic): + """Unsubscribe from the specified topic. + + Parameters + ---------- + topic : :class:`Topic` + Instance of the topic to unsubscribe from. + """ + # Unsubscribe from ZeroMQ socket + self.sub_socket.setsockopt_string(zmq.UNSUBSCRIBE, topic.name) + + # Remove all local listeners for this topic + event_key = "event:{}".format(topic.name) + self.remove_all_listeners(event_key) \ No newline at end of file diff --git a/tests/integration/test_zeromq.py b/tests/integration/test_zeromq.py new file mode 100644 index 00000000..a52b4b90 --- /dev/null +++ b/tests/integration/test_zeromq.py @@ -0,0 +1,151 @@ +from threading import Event + +from compas_eve import Message +from compas_eve import Publisher +from compas_eve import Subscriber +from compas_eve import Topic +from compas_eve import set_default_transport +from compas_eve.zeromq import ZeroMQTransport + +import pytest + + +def test_zeromq_tcp_pubsub(): + """Test ZeroMQ transport with TCP endpoints.""" + tx = ZeroMQTransport("tcp://localhost:25555") + event = Event() + topic = Topic("/messages_compas_eve_test/tcp_pubsub/", Message) + + Subscriber(topic, lambda m: event.set(), transport=tx).subscribe() + + # Small delay to ensure subscriber is ready + import time + time.sleep(0.1) + + Publisher(topic, transport=tx).publish(Message(done=True)) + + received = event.wait(timeout=3) + assert received, "Message not received" + + tx.close() + + +def test_zeromq_tcp_message_content(): + """Test that message content is preserved correctly with TCP transport.""" + tx = ZeroMQTransport("tcp://localhost:25556") + event = Event() + topic = Topic("/messages_compas_eve_test/tcp_content/", Message) + + result = {} + + def callback(msg): + result["message"] = msg + event.set() + + Subscriber(topic, callback, transport=tx).subscribe() + + # Small delay to ensure subscriber is ready + import time + time.sleep(0.1) + + test_message = Message( + name="ZeroMQ Test", + value=42, + nested={"key": "value", "list": [1, 2, 3]} + ) + Publisher(topic, transport=tx).publish(test_message) + + received = event.wait(timeout=3) + assert received, "Message not received" + assert result["message"].name == "ZeroMQ Test" + assert result["message"].value == 42 + assert result["message"].nested["key"] == "value" + assert result["message"].nested["list"] == [1, 2, 3] + + tx.close() + + +def test_zeromq_tcp_multiple_topics(): + """Test that multiple topics work correctly with TCP transport.""" + tx = ZeroMQTransport("tcp://localhost:25557") + + topic1 = Topic("/test/topic1", Message) + topic2 = Topic("/test/topic2", Message) + + event1 = Event() + event2 = Event() + + result = {} + + def callback1(msg): + result["topic1"] = msg + event1.set() + + def callback2(msg): + result["topic2"] = msg + event2.set() + + # Subscribe to both topics + Subscriber(topic1, callback1, transport=tx).subscribe() + Subscriber(topic2, callback2, transport=tx).subscribe() + + # Small delay to ensure subscribers are ready + import time + time.sleep(0.1) + + # Publish to topic1 + Publisher(topic1, transport=tx).publish(Message(source="topic1")) + + # Publish to topic2 + Publisher(topic2, transport=tx).publish(Message(source="topic2")) + + received1 = event1.wait(timeout=3) + received2 = event2.wait(timeout=3) + + assert received1, "Message 1 not received" + assert received2, "Message 2 not received" + assert result["topic1"].source == "topic1" + assert result["topic2"].source == "topic2" + + tx.close() + + +def test_zeromq_tcp_unsubscribe(): + """Test unsubscribe functionality with TCP transport.""" + tx = ZeroMQTransport("tcp://localhost:25558") + topic = Topic("/test/unsub", Message) + + result = dict(count=0, event=Event()) + + def callback(msg): + result["count"] += 1 + result["event"].set() + + sub = Subscriber(topic, callback, transport=tx) + pub = Publisher(topic, transport=tx) + + # Subscribe and receive first message + sub.subscribe() + + # Small delay to ensure subscriber is ready + import time + time.sleep(0.1) + + pub.publish(Message(seq=1)) + + received = result["event"].wait(timeout=3) + assert received, "First message not received" + assert result["count"] == 1 + + # Unsubscribe + result["event"].clear() + sub.unsubscribe() + + # Publish second message - should not be received + pub.publish(Message(seq=2)) + + received = result["event"].wait(timeout=1) + assert received is False, "Second message received but it should have been unsubscribed" + assert result["count"] == 1, "Message count should still be 1" + + tx.close() \ No newline at end of file diff --git a/tests/unit/test_zeromq.py b/tests/unit/test_zeromq.py new file mode 100644 index 00000000..622da0b8 --- /dev/null +++ b/tests/unit/test_zeromq.py @@ -0,0 +1,134 @@ +from threading import Event + +from compas_eve import Message +from compas_eve import Publisher +from compas_eve import Subscriber +from compas_eve import Topic +from compas_eve import set_default_transport +from compas_eve.zeromq import ZeroMQTransport + +import pytest + + +def test_zeromq_import(): + """Test that ZeroMQ transport can be imported.""" + assert ZeroMQTransport is not None + + +def test_default_transport_publishing(): + set_default_transport(ZeroMQTransport("inproc://test1")) + event = Event() + topic = Topic("/messages_compas_eve_test/test_default_transport_publishing/", Message) + + Subscriber(topic, lambda m: event.set()).subscribe() + Publisher(topic).publish(Message(done=True)) + + received = event.wait(timeout=3) + assert received, "Message not received" + + +def test_pubsub(): + tx = ZeroMQTransport("inproc://test2") + event = Event() + topic = Topic("/messages_compas_eve_test/test_pubsub/", Message) + + Subscriber(topic, lambda m: event.set(), transport=tx).subscribe() + Publisher(topic, transport=tx).publish(Message(done=True)) + + received = event.wait(timeout=3) + assert received, "Message not received" + + tx.close() + + +def test_two_subs(): + tx = ZeroMQTransport("inproc://test3") + event1 = Event() + event2 = Event() + topic = Topic("/messages_compas_eve_test/test_two_subs/", Message) + + Subscriber(topic, lambda m: event1.set(), transport=tx).subscribe() + Subscriber(topic, lambda m: event2.set(), transport=tx).subscribe() + Publisher(topic, transport=tx).publish(Message(done=True)) + + received1 = event1.wait(timeout=2) + received2 = event2.wait(timeout=2) + assert received1, "Message 1 not received" + assert received2, "Message 2 not received" + + tx.close() + + +def test_unsub(): + tx = ZeroMQTransport("inproc://test4") + topic = Topic("/messages_compas_eve_test/test_unsub/", Message) + + result = dict(count=0, event=Event()) + + def callback(msg): + result["count"] += 1 + result["event"].set() + + pub = Publisher(topic, transport=tx) + sub = Subscriber(topic, callback, transport=tx) + + sub.subscribe() + pub.publish(Message(done=True)) + received = result["event"].wait(timeout=3) + assert received, "First message not received" + assert len(list(tx._local_callbacks.keys())) == 1, "Internal callback reference should have been kept" + + result["event"].clear() + sub.unsubscribe() + pub.publish(Message(done=True)) + + received = result["event"].wait(timeout=1) + assert received is False, "Second message received but it should have been unsubscribed" + assert result["count"] == 1, "Did not unsubscribe properly" + assert len(list(tx._local_callbacks.keys())) == 0, "Internal callback reference should have been released" + + tx.close() + + +def test_message_type_parsing(): + tx = ZeroMQTransport("inproc://test5") + event = Event() + topic = Topic("/messages_compas_eve_test/test_message_type_parsing/", Message) + + result = {} + + def callback(msg): + result["message"] = msg + event.set() + + Subscriber(topic, callback, transport=tx).subscribe() + Publisher(topic, transport=tx).publish(Message(name="Compas Eve", done=True)) + + received = event.wait(timeout=3) + assert received, "Message not received" + assert result["message"].name == "Compas Eve" + assert result["message"].done is True + + tx.close() + + +def test_dict_as_message(): + tx = ZeroMQTransport("inproc://test6") + event = Event() + topic = Topic("/messages_compas_eve_test/test_dict_as_message/", Message) + + result = {} + + def callback(msg): + result["message"] = msg + event.set() + + Subscriber(topic, callback, transport=tx).subscribe() + Publisher(topic, transport=tx).publish({"name": "Compas Eve", "done": True}) + + received = event.wait(timeout=3) + assert received, "Message not received" + assert result["message"].name == "Compas Eve" + assert result["message"].done is True + + tx.close() \ No newline at end of file