|
15 | 15 | """ |
16 | 16 |
|
17 | 17 | import os |
| 18 | +import pickle |
18 | 19 | import threading |
19 | 20 | import time |
20 | 21 | import traceback |
@@ -110,7 +111,11 @@ def recv_json(self, flags: int = 0): |
110 | 111 | try: |
111 | 112 | # receive from socket |
112 | 113 | msg = self.socket.recv(flags=flags) |
113 | | - data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf)) |
| 114 | + try: |
| 115 | + data_dict = self.socket._deserialize(msg, lambda buf: jsonapi.loads(buf)) |
| 116 | + except (UnicodeDecodeError, ValueError) as e: |
| 117 | + llm_logger.warning(f"recv_json decode failed, msg={msg}, err={e}") |
| 118 | + raise |
114 | 119 |
|
115 | 120 | # collect zmq recv metrics |
116 | 121 | _zmq_metrics_stats.msg_bytes_recv_total += len(msg) |
@@ -152,7 +157,11 @@ def recv_pyobj(self, flags: int = 0): |
152 | 157 | _zmq_metrics_stats = ZMQMetricsStats() |
153 | 158 | self._ensure_socket() |
154 | 159 | data_bytes = self.socket.recv(flags=flags) |
155 | | - envelope = ForkingPickler.loads(data_bytes) |
| 160 | + try: |
| 161 | + envelope = ForkingPickler.loads(data_bytes) |
| 162 | + except (UnicodeDecodeError, ValueError, pickle.UnpicklingError) as e: |
| 163 | + llm_logger.warning(f"recv_pyobj decode failed, msg={data_bytes}, err={e}") |
| 164 | + raise |
156 | 165 | if isinstance(envelope, dict): |
157 | 166 | if "__meta" in envelope and "send_ts" in envelope["__meta"]: |
158 | 167 | _zmq_metrics_stats.msg_recv_total += 1 |
@@ -539,7 +548,12 @@ def recv_control_cmd(self): |
539 | 548 | """ |
540 | 549 | self._ensure_socket() |
541 | 550 | try: |
542 | | - client, _, task_data = self.socket.recv_multipart(flags=zmq.NOBLOCK) |
| 551 | + frames = self.socket.recv_multipart(flags=zmq.NOBLOCK) |
| 552 | + if len(frames) < 2: |
| 553 | + llm_logger.warning(f"recv_control_cmd: unexpected frame count {len(frames)}, dropping message") |
| 554 | + return None |
| 555 | + client = frames[0] |
| 556 | + task_data = frames[-1] |
543 | 557 | task = msgpack.unpackb(task_data) |
544 | 558 | task_id_str = task["task_id"] |
545 | 559 | except zmq.Again: |
@@ -577,6 +591,8 @@ def close(self): |
577 | 591 | llm_logger.info("ZMQ server is closing connection...") |
578 | 592 | try: |
579 | 593 | if self.socket is not None and not self.socket.closed: |
| 594 | + if self.address: |
| 595 | + self.socket.unbind(self.address) |
580 | 596 | self.socket.close() |
581 | 597 | if not self.context.closed: |
582 | 598 | self.context.term() |
|
0 commit comments