Skip to content

Commit ba573ba

Browse files
codeSamuraiiCopilot
andcommitted
Fix bytes serialization
Co-authored-by: Copilot <copilot@github.com>
1 parent 57e7dcf commit ba573ba

2 files changed

Lines changed: 22 additions & 4 deletions

File tree

pyfuse/worker/remote.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,18 @@ async def _handle_task(
572572
# If the client cancelled mid-execution, it already stored a cancelled
573573
# result envelope (via Result.cancel) which the client reads first.
574574
await flush()
575-
await backend.send_result(task.task_id, envelope.to_json())
575+
try:
576+
result_json = envelope.to_json()
577+
except Exception as exc:
578+
# Result serialization failed (e.g. unsupported return type).
579+
# Surface the failure as an error envelope rather than letting
580+
# the task hang forever from the client's point of view.
581+
logger.exception(
582+
"Failed to serialize result for task %s", task.task_id,
583+
)
584+
envelope = ResultEnvelope.failure(task.task_id, exc)
585+
result_json = envelope.to_json()
586+
await backend.send_result(task.task_id, result_json)
576587
await backend.notify_result(task.task_id)
577588

578589
_log_task_result(task, envelope, elapsed_ms, worker)

pyfuse/worker/result.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from dataclasses import dataclass
1010
from collections.abc import Generator
1111

12+
from pyfuse.core.task import _TaskEncoder, _resolve
1213
from pyfuse.core.errors import RemoteError, TaskStalled, TaskCancelled, ThrottleError
1314
from pyfuse.core.progress import ProgressInfo
1415
from pyfuse.worker.backends.base import Backend
@@ -58,7 +59,12 @@ def failure(cls, task_id: str, exc: BaseException) -> Self:
5859
)
5960

6061
def to_json(self) -> str:
61-
"""Serialize to JSON string."""
62+
"""Serialize to JSON string.
63+
64+
Result payloads are encoded with the same sentinel-based encoder
65+
used for task arguments, so ``bytes``, ``datetime``, ``Decimal``,
66+
``Path`` etc. round-trip transparently.
67+
"""
6268
d: dict[str, Any] = {
6369
"task_id": self.task_id,
6470
"status": self.status,
@@ -69,16 +75,17 @@ def to_json(self) -> str:
6975
d["error_type"] = self.error_type
7076
d["error_message"] = self.error_message
7177
d["error_traceback"] = self.error_traceback
72-
return json.dumps(d)
78+
return json.dumps(d, cls=_TaskEncoder)
7379

7480
@classmethod
7581
def from_json(cls, raw: str | bytes) -> Self:
7682
"""Deserialize from a JSON string or bytes."""
7783
data = json.loads(raw)
84+
result = _resolve(data.get("result"), {}) if data.get("status") == "ok" else None
7885
return cls(
7986
task_id=data["task_id"],
8087
status=data["status"],
81-
result=data.get("result"),
88+
result=result,
8289
error_type=data.get("error_type"),
8390
error_message=data.get("error_message"),
8491
error_traceback=data.get("error_traceback"),

0 commit comments

Comments
 (0)