refactor: generalize ZMQ pub/sub over message type via MessageCodec#300
refactor: generalize ZMQ pub/sub over message type via MessageCodec#300nv-alicheng merged 4 commits intomainfrom
Conversation
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
There was a problem hiding this comment.
Code Review
This pull request generalizes the pub/sub transport layer by introducing a generic MessageCodec protocol and refactoring the existing EventRecord-specific publisher and subscriber into generic MessagePublisher[T] and MessageSubscriber[T] classes. The ZMQ implementations were updated to support these generics and now include additional socket configuration options such as conflate and high-water marks. Feedback was provided to broaden the exception handling in the subscriber's read loop to ensure the implementation remains truly generic and adheres to the codec protocol contract.
Replace EventRecord-specific publisher/subscriber classes with generic ZmqMessagePublisher[T] / ZmqMessageSubscriber[T] parameterized by a MessageCodec[T] Protocol. EventRecordCodec preserves existing wire format and decode-error wrapping behavior. Sets up the generic transport that the upcoming MetricsSnapshot publisher will reuse. - protocol.py: drop EventRecordPublisher/Subscriber ABCs; add MessageCodec, MessagePublisher[T], MessageSubscriber[T]. - pubsub.py: rewrite as ZmqMessagePublisher[T]/ZmqMessageSubscriber[T]; expose sndhwm/linger/conflate so future callers (e.g. live snapshots) can choose drop-old vs. delivery-guarantee semantics. - record.py: add EventRecordCodec next to encode/decode helpers. - Update EventPublisherService, EventLoggerService, MetricsAggregatorService and tests to use the generic classes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
c810acd to
bd5ab85
Compare
Per Gemini review on PR #300: catching only msgspec.DecodeError in MessageSubscriber._on_readable bakes the codec implementation into the supposedly-generic base class. A future codec backed by json, pickle, etc. raises different exception types and would bypass on_decode_error, crashing the reader. - protocol.py: widen the catch back to Exception so the base class makes no assumption about which decoder library a codec uses; drop the now- unused msgspec import. - record.py: tighten EventRecordCodec.on_decode_error to wrap only msgspec.DecodeError and re-raise other exceptions. Preserves the previous behavior parity (only malformed-payload errors become ErrorEventType.GENERIC records; programmer bugs in the decode path still surface). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
arekay-nv
left a comment
There was a problem hiding this comment.
Review Council — Multi-AI Code Review
Reviewed by Codex + Claude | Depth: standard | HEAD: faaff7b
Found 1 issue across 1 file: 0 critical, 0 high, 1 medium, 0 low.
Codex returned no actionable regressions: "The refactor from EventRecord-specific pub/sub classes to generic codec-driven message transport appears internally consistent." Claude flagged one test-coverage gap on the new on_decode_error codec method (see inline comment).
Note (not posted as inline): a few docs/**/DESIGN.md files still reference the old symbol names (encode_event_record, decode_event_record, ZmqEventRecordPublisher, ZmqEventRecordSubscriber). Worth a follow-up doc sweep.
Review Council — Multi-AI Code ReviewReviewed by Codex + Claude | Depth: standard | HEAD: Found 1 issue across 1 file:
Codex verdict
Cross-reviewer agreement: only one finding overall, from Claude. Codex independently concluded the refactor is clean. Follow-up (not posted inline)A few design docs reference the old symbol names — likely a small doc sweep:
Stale references include Inline review: #pullrequestreview-4231671106 🤖 Posted by |
Address PR #300 review feedback: on_decode_error has two distinct branches and neither was exercised. The re-raise branch in particular is the behavior MessageSubscriber._on_readable relies on to surface decode-path bugs — a non-DecodeError must propagate, otherwise it escapes the asyncio reader callback and silently de-registers the subscriber. - test_wraps_msgspec_decode_error_into_generic_error_record: forces a real msgspec.DecodeError through the codec's own decoder (not a hand-constructed exception), then asserts on_decode_error returns a wrapped EventRecord(ErrorEventType.GENERIC, ErrorData(...)). - test_reraises_non_decode_error: passes a ValueError and asserts it propagates unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
EventRecord-specific publisher/subscriber classes with genericZmqMessagePublisher[T]/ZmqMessageSubscriber[T]parameterized by aMessageCodec[T]Protocol.EventRecordCodecpreserves the existing wire format and decode-error wrapping behavior, so all current callers (EventPublisherService,EventLoggerService,MetricsAggregatorService) keep their semantics — they just thread the codec throughsuper().__init__.sndhwm/linger/conflate/rcvhwmknobs so future callers can choose drop-old vs delivery-guarantee semantics per message type.Why
The next step is a
MetricsPublisherthat publishesMetricsSnapshotfor a live TUI consumer. Building it directly on top of the EventRecord-specific pub/sub would duplicate the batching / pending-queue / async-writer machinery, or shoehorn snapshots through an EventRecord-shaped wrapper. Generalizing first is one rename pass + a codec extraction; afterward the metrics publisher is a thin composition.Design context lives at
.cursor_artifacts/zmq_metrics_publisher.md(not part of this PR; just for reference).Behavior preservation
MessageSubscriber._on_readablecatchesmsgspec.DecodeErroronly — same as before.EventRecordCodec.on_decode_errorreturns the sameErrorEventType.GENERICwrapper that the old_on_readableconstructed inline.SNDHWM=0,LINGER=-1,IMMEDIATE=1,RCVHWM=0) are the new parameter defaults.Test plan
tests/unit/transport/test_zmq_pool_transport.py— updated to useZmqMessagePublisher(EventRecordCodec(), ...)tests/unit/async_utils/test_event_publisher.py—CollectingEventSubscriberupdatedtests/unit/transport/,tests/unit/async_utils/test_event_publisher.py,tests/unit/async_utils/services/event_logger/,tests/unit/async_utils/services/metrics_aggregator/(run ininference_endpoint_devcontainer)pre-commit runclean (ruff, ruff-format, mypy, license headers, prettier)🤖 Generated with Claude Code