Skip to content

Commit ad16c3c

Browse files
fix(dynamic-blocks): tee stdout/stderr so print() still reaches Docke… (#2258)
* fix(dynamic-blocks): tee stdout/stderr so print() still reaches Docker logs * fix(workflows): return 400 on Modal DynamicBlockCodeError Make `inner_error_type` and `inner_error_message` optional on `WorkflowErrorResponse` so errors raised without an `inner_error` (e.g. from the Modal/OCI executors) don't fail pydantic validation and surface as a 500 instead of the intended 400. * return also std out/err on success calls * return on error and on ok with stdout/err data * remove duplicated lgos * undo unnecessary change for returning * fix tabs/spaces --------- Co-authored-by: Paweł Pęczek <146137186+PawelPeczek-Roboflow@users.noreply.github.com>
1 parent 85e2534 commit ad16c3c

5 files changed

Lines changed: 44 additions & 16 deletions

File tree

inference/core/entities/responses/workflows.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,6 @@ class WorkflowErrorResponse(BaseModel):
200200
message: str
201201
error_type: str
202202
context: str
203-
inner_error_type: str
204-
inner_error_message: str
205-
blocks_errors: Optional[List[WorkflowBlockError]]
203+
inner_error_type: Optional[str] = None
204+
inner_error_message: Optional[str] = None
205+
blocks_errors: Optional[List[WorkflowBlockError]] = None

inference/core/workflows/execution_engine/v1/dynamic_blocks/block_scaffolding.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ def run(self, *args, **kwargs) -> BlockResult:
108108
import_lines_count = len(_get_python_code_imports(python_code).splitlines())
109109
try:
110110
with capture_output() as (stdout_buf, stderr_buf):
111+
# stdout/stderr already reach the process streams in real time via the
112+
# tee in capture_output(); buffers are only used to attach context on error.
111113
return run_function(self, *args, **kwargs)
112114
except Exception as error:
113115
raise create_dynamic_block_code_error(

inference/core/workflows/execution_engine/v1/dynamic_blocks/error_utils.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,39 @@
1414

1515

1616
class _ThreadDispatchStream:
17-
"""Stream wrapper that dispatches writes to a per-thread StringIO buffer
18-
when one is active, otherwise falls through to the original stream.
17+
"""Stream wrapper that tees writes into a per-thread StringIO buffer
18+
(when one is active) while still forwarding them to the original stream.
1919
20-
This avoids the need for a global lock: each thread captures to its own
21-
buffer independently, while threads that are not capturing see normal
22-
stdout/stderr behaviour.
20+
Threads that are not capturing see normal stdout/stderr behaviour; threads
21+
that are capturing get both: the buffer keeps an in-memory copy for error
22+
payloads, and the original stream still receives the bytes so ``print()``
23+
output continues to reach Docker / the process stdout.
2324
"""
2425

2526
def __init__(self, original, attr_name: str):
2627
object.__setattr__(self, "_original", original)
2728
object.__setattr__(self, "_attr_name", attr_name)
2829

29-
def _get_target(self):
30-
buf = getattr(_thread_local, self._attr_name, None)
31-
return buf if buf is not None else self._original
30+
def _get_buffer(self):
31+
return getattr(_thread_local, self._attr_name, None)
3232

3333
def write(self, data):
34-
return self._get_target().write(data)
34+
buf = self._get_buffer()
35+
if buf is not None:
36+
try:
37+
buf.write(data)
38+
except Exception:
39+
pass
40+
return self._original.write(data)
3541

3642
def flush(self):
37-
return self._get_target().flush()
43+
buf = self._get_buffer()
44+
if buf is not None:
45+
try:
46+
buf.flush()
47+
except Exception:
48+
pass
49+
return self._original.flush()
3850

3951
def fileno(self):
4052
return self._original.fileno()
@@ -43,7 +55,7 @@ def isatty(self):
4355
return self._original.isatty()
4456

4557
def __getattr__(self, name):
46-
return getattr(self._get_target(), name)
58+
return getattr(self._original, name)
4759

4860

4961
def _install_dispatchers() -> None:

inference/core/workflows/execution_engine/v1/dynamic_blocks/modal_executor.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import base64
99
import json
1010
import os
11+
import sys
1112
from typing import Any, Dict, Optional
1213

1314
import numpy as np
@@ -352,6 +353,15 @@ def execute_remote(
352353
stderr=result.get("stderr"),
353354
)
354355

356+
stdout = result.get("stdout")
357+
stderr = result.get("stderr")
358+
if stdout:
359+
sys.stdout.write(stdout)
360+
sys.stdout.flush()
361+
if stderr:
362+
sys.stderr.write(stderr)
363+
sys.stderr.flush()
364+
355365
# Get the result and deserialize from JSON
356366
json_result = result.get("result", "{}")
357367
return deserialize_for_modal_remote_execution(json_result)

modal/modal_app.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,12 @@ class BlockSelf:
363363

364364
json_result = serialize_for_modal_remote_execution(result)
365365

366-
# Return the serialized result with success flag
367-
return {"success": True, "result": json_result}
366+
return {
367+
"success": True,
368+
"result": json_result,
369+
"stdout": stdout_buf.getvalue() or None,
370+
"stderr": stderr_buf.getvalue() or None,
371+
}
368372
except Exception as e:
369373
# On error, capture stdout/stderr and return error details
370374
result = {

0 commit comments

Comments
 (0)