Skip to content
100 changes: 85 additions & 15 deletions comfy_cli/command/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,73 @@
workspace_manager = WorkspaceManager()


def is_ui_workflow(workflow) -> bool:
return isinstance(workflow, dict) and "nodes" in workflow and "links" in workflow


def _validate_api_workflow(workflow):
"""Return the workflow dict if it has the shape of API format, else None."""
if not isinstance(workflow, dict) or not workflow:
return None
node = workflow[next(iter(workflow))]
if not isinstance(node, dict) or "class_type" not in node:
return None
return workflow
Comment thread
bigcat88 marked this conversation as resolved.


def load_api_workflow(file: str):
with open(file, encoding="utf-8") as f:
workflow = json.load(f)
# Check for litegraph properties to ensure this isnt a UI workflow file
if "nodes" in workflow and "links" in workflow:
return None
if is_ui_workflow(workflow):
return None
return _validate_api_workflow(workflow)


class WorkflowConverterUnavailable(Exception):
"""The running ComfyUI server doesn't expose /workflow/convert."""

# Try validating the first entry to ensure it has a node class property
node_id = next(iter(workflow))
node = workflow[node_id]
if "class_type" not in node:
return None

return workflow
def convert_ui_workflow_via_server(workflow: dict, host: str, port: int, timeout: int) -> dict:
"""POST a UI-format workflow to the server's /workflow/convert and return API-format JSON.

Raises WorkflowConverterUnavailable if the server doesn't expose the endpoint.
Raises typer.Exit on other conversion failures.
"""
url = f"http://{host}:{port}/workflow/convert"
req = request.Request(url, json.dumps(workflow).encode("utf-8"))
req.add_header("Content-Type", "application/json")
try:
resp = request.urlopen(req, timeout=timeout)
except urllib.error.HTTPError as e:
if e.code in (404, 405):
raise WorkflowConverterUnavailable() from e
body = e.read().decode("utf-8", errors="replace").strip()
pprint(f"[bold red]Workflow conversion failed (HTTP {e.code}): {body[:500]}[/bold red]")
raise typer.Exit(code=1) from e
except urllib.error.URLError as e:
pprint(f"[bold red]Workflow conversion failed: {e.reason}[/bold red]")
raise typer.Exit(code=1) from e
try:
converted = json.loads(resp.read())
except json.JSONDecodeError as e:
pprint("[bold red]Workflow conversion failed: server returned invalid JSON[/bold red]")
raise typer.Exit(code=1) from e
if not isinstance(converted, dict):
pprint("[bold red]Workflow conversion failed: expected a JSON object[/bold red]")
raise typer.Exit(code=1)
return converted

Comment thread
coderabbitai[bot] marked this conversation as resolved.

def _print_converter_unavailable_help() -> None:
pprint(
"[bold red]UI-format workflow detected, but the running ComfyUI server[/bold red]\n"
"[bold red]doesn't expose a /workflow/convert endpoint to convert it to API format.[/bold red]\n"
"\n"
"[yellow]Workarounds:[/yellow]\n"
"[yellow] * Install a custom node that adds /workflow/convert on the server[/yellow]\n"
"[yellow] * Or, in the ComfyUI frontend, enable Dev Mode under Settings and use[/yellow]\n"
"[yellow] 'Workflow > Export (API)' to save your workflow as API format[/yellow]"
)


def execute(workflow: str, host, port, wait=True, verbose=False, local_paths=False, timeout=30):
Expand All @@ -44,16 +97,33 @@ def execute(workflow: str, host, port, wait=True, verbose=False, local_paths=Fal
)
raise typer.Exit(code=1)

workflow = load_api_workflow(workflow)

if not workflow:
pprint("[bold red]Specified workflow does not appear to be an API workflow json file[/bold red]")
raise typer.Exit(code=1)

if not check_comfy_server_running(port, host):
pprint(f"[bold red]ComfyUI not running on specified address ({host}:{port})[/bold red]")
raise typer.Exit(code=1)

try:
with open(workflow_name, encoding="utf-8") as f:
raw_workflow = json.load(f)
except OSError as e:
pprint(f"[bold red]Unable to read workflow file: {e}[/bold red]")
raise typer.Exit(code=1) from e
except json.JSONDecodeError as e:
pprint(f"[bold red]Specified workflow file is not valid JSON: {e}[/bold red]")
raise typer.Exit(code=1) from e

Comment thread
coderabbitai[bot] marked this conversation as resolved.
if is_ui_workflow(raw_workflow):
pprint("[yellow]Detected UI-format workflow, converting via server's /workflow/convert...[/yellow]")
try:
workflow = convert_ui_workflow_via_server(raw_workflow, host, port, timeout)
except WorkflowConverterUnavailable:
_print_converter_unavailable_help()
raise typer.Exit(code=1)
else:
workflow = _validate_api_workflow(raw_workflow)
if not workflow:
pprint("[bold red]Specified workflow does not appear to be an API workflow json file[/bold red]")
raise typer.Exit(code=1)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

progress = None
start = time.time()
if wait:
Expand Down
197 changes: 196 additions & 1 deletion tests/comfy_cli/command/test_run.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
import io
import json
import os
import tempfile
import urllib.error
from unittest.mock import MagicMock, patch

import pytest
import typer
from websocket import WebSocketException, WebSocketTimeoutException

from comfy_cli.command.run import WorkflowExecution, execute, load_api_workflow
from comfy_cli.command.run import (
WorkflowConverterUnavailable,
WorkflowExecution,
convert_ui_workflow_via_server,
execute,
is_ui_workflow,
load_api_workflow,
)


@pytest.fixture
Expand Down Expand Up @@ -80,6 +89,86 @@ def test_rejects_invalid_node(self):
assert result is None


class TestIsUiWorkflow:
def test_detects_ui_workflow(self):
assert is_ui_workflow({"nodes": [{"id": 1}], "links": []})

def test_rejects_api_workflow(self):
assert not is_ui_workflow({"1": {"class_type": "X", "inputs": {}}})

def test_rejects_non_dict(self):
assert not is_ui_workflow(["nodes", "links"])
assert not is_ui_workflow(None)

def test_requires_both_keys(self):
assert not is_ui_workflow({"nodes": []})
assert not is_ui_workflow({"links": []})


def _make_http_error(code: int, body: bytes = b"") -> urllib.error.HTTPError:
return urllib.error.HTTPError(
url="http://127.0.0.1:8188/workflow/convert",
code=code,
msg=f"HTTP {code}",
hdrs=None,
fp=io.BytesIO(body),
)


class TestConvertUiWorkflowViaServer:
UI = {"nodes": [{"id": 1, "type": "X"}], "links": []}
CONVERTED = {"1": {"class_type": "X", "inputs": {}}}

def test_returns_api_format_on_success(self):
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps(self.CONVERTED).encode()
with patch("comfy_cli.command.run.request.urlopen", return_value=mock_resp) as mock_open:
result = convert_ui_workflow_via_server(self.UI, "127.0.0.1", 8188, timeout=30)

assert result == self.CONVERTED
sent_req = mock_open.call_args[0][0]
assert sent_req.full_url == "http://127.0.0.1:8188/workflow/convert"
assert json.loads(sent_req.data) == self.UI

@pytest.mark.parametrize("code", [404, 405])
def test_raises_unavailable_on_missing_endpoint(self, code):
with patch("comfy_cli.command.run.request.urlopen", side_effect=_make_http_error(code)):
with pytest.raises(WorkflowConverterUnavailable):
convert_ui_workflow_via_server(self.UI, "127.0.0.1", 8188, timeout=30)

def test_raises_typer_exit_on_server_error(self):
err = _make_http_error(500, b"conversion blew up")
with patch("comfy_cli.command.run.request.urlopen", side_effect=err):
with pytest.raises(typer.Exit) as exc_info:
convert_ui_workflow_via_server(self.UI, "127.0.0.1", 8188, timeout=30)
assert exc_info.value.exit_code == 1

def test_raises_typer_exit_on_network_error(self):
with patch(
"comfy_cli.command.run.request.urlopen",
side_effect=urllib.error.URLError("Connection refused"),
):
with pytest.raises(typer.Exit) as exc_info:
convert_ui_workflow_via_server(self.UI, "127.0.0.1", 8188, timeout=30)
assert exc_info.value.exit_code == 1

def test_raises_typer_exit_on_invalid_json(self):
mock_resp = MagicMock()
mock_resp.read.return_value = b"<html>not json</html>"
with patch("comfy_cli.command.run.request.urlopen", return_value=mock_resp):
with pytest.raises(typer.Exit) as exc_info:
convert_ui_workflow_via_server(self.UI, "127.0.0.1", 8188, timeout=30)
assert exc_info.value.exit_code == 1

def test_raises_typer_exit_on_non_object_response(self):
mock_resp = MagicMock()
mock_resp.read.return_value = b'["not", "an", "object"]'
with patch("comfy_cli.command.run.request.urlopen", return_value=mock_resp):
with pytest.raises(typer.Exit) as exc_info:
convert_ui_workflow_via_server(self.UI, "127.0.0.1", 8188, timeout=30)
assert exc_info.value.exit_code == 1


class TestWatchExecution:
def test_successful_execution(self, mock_execution):
prompt_id = "test-prompt"
Expand Down Expand Up @@ -259,6 +348,55 @@ def test_file_not_found_exits(self):
execute("/nonexistent/workflow.json", host="127.0.0.1", port=8188)
assert exc_info.value.exit_code == 1

def test_rejects_invalid_workflow_format(self):
bad = {"1": {"no_class_type_here": "X"}}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(bad, f)
f.flush()
path = f.name
try:
with patch("comfy_cli.command.run.check_comfy_server_running", return_value=True):
with pytest.raises(typer.Exit) as exc_info:
execute(path, host="127.0.0.1", port=8188)
assert exc_info.value.exit_code == 1
finally:
os.unlink(path)

def test_rejects_malformed_json(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write("{ this is not valid json")
f.flush()
path = f.name
try:
with patch("comfy_cli.command.run.check_comfy_server_running", return_value=True):
with pytest.raises(typer.Exit) as exc_info:
execute(path, host="127.0.0.1", port=8188)
assert exc_info.value.exit_code == 1
finally:
os.unlink(path)

def test_rejects_unreadable_file(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write("{}")
path = f.name
try:
real_open = open

def fake_open(file, *args, **kwargs):
if file == path:
raise PermissionError(13, "Permission denied", path)
return real_open(file, *args, **kwargs)

with (
patch("comfy_cli.command.run.check_comfy_server_running", return_value=True),
patch("builtins.open", side_effect=fake_open),
):
with pytest.raises(typer.Exit) as exc_info:
execute(path, host="127.0.0.1", port=8188)
assert exc_info.value.exit_code == 1
finally:
os.unlink(path)

def test_progress_stopped_on_error(self, workflow_file):
with (
patch("comfy_cli.command.run.check_comfy_server_running", return_value=True),
Expand All @@ -274,3 +412,60 @@ def test_progress_stopped_on_error(self, workflow_file):
with pytest.raises(typer.Exit):
execute(workflow_file, host="127.0.0.1", port=8188, wait=True, timeout=30)
mock_progress.stop.assert_called()


class TestExecuteUiWorkflow:
UI = {"nodes": [{"id": 1, "type": "X"}], "links": []}
CONVERTED = {"1": {"class_type": "EmptyLatentImage", "inputs": {"width": 64, "height": 64, "batch_size": 1}}}

@pytest.fixture
def ui_workflow_file(self):
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(self.UI, f)
f.flush()
path = f.name
yield path
os.unlink(path)

def test_ui_workflow_is_converted_then_executed(self, ui_workflow_file):
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps(self.CONVERTED).encode()

with (
patch("comfy_cli.command.run.check_comfy_server_running", return_value=True),
patch("comfy_cli.command.run.request.urlopen", return_value=mock_resp) as mock_open,
patch("comfy_cli.command.run.ExecutionProgress"),
patch("comfy_cli.command.run.WorkflowExecution") as MockExec,
):
mock_exec = MagicMock()
MockExec.return_value = mock_exec
mock_exec.outputs = []

execute(ui_workflow_file, host="127.0.0.1", port=8188, wait=True, timeout=30)

sent_req = mock_open.call_args[0][0]
assert sent_req.full_url == "http://127.0.0.1:8188/workflow/convert"
assert MockExec.call_args.args[0] == self.CONVERTED
mock_exec.queue.assert_called_once()

@pytest.mark.parametrize("code", [404, 405])
def test_ui_workflow_exits_when_endpoint_missing(self, ui_workflow_file, code):
with (
patch("comfy_cli.command.run.check_comfy_server_running", return_value=True),
patch("comfy_cli.command.run.request.urlopen", side_effect=_make_http_error(code)),
patch("comfy_cli.command.run.WorkflowExecution") as MockExec,
):
with pytest.raises(typer.Exit) as exc_info:
execute(ui_workflow_file, host="127.0.0.1", port=8188, wait=True, timeout=30)
assert exc_info.value.exit_code == 1
MockExec.assert_not_called()

def test_ui_workflow_exits_when_server_not_running(self, ui_workflow_file):
with (
patch("comfy_cli.command.run.check_comfy_server_running", return_value=False),
patch("comfy_cli.command.run.request.urlopen") as mock_open,
):
with pytest.raises(typer.Exit) as exc_info:
execute(ui_workflow_file, host="127.0.0.1", port=8188)
assert exc_info.value.exit_code == 1
mock_open.assert_not_called()
Loading