Skip to content

Commit d8fbbbb

Browse files
committed
feat: update shellflow version to 0.4.7 and enhance output handling in execution runner
1 parent e92e81a commit d8fbbbb

6 files changed

Lines changed: 219 additions & 104 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "shellflow"
3-
version = "0.4.6"
3+
version = "0.4.7"
44
description = "A minimal shell script orchestrator with SSH support"
55
readme = "README.md"
66
license = "Apache-2.0"

src/shellflow/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from __future__ import annotations
44

55
from .cli import create_parser
6-
from .config import read_ssh_config
6+
from .config import parse_server_config, parse_variables, read_ssh_config
77
from .constants import (
88
EXIT_EXECUTION_FAILURE,
99
EXIT_PARSE_FAILURE,
@@ -101,6 +101,8 @@
101101
"execute_remote",
102102
"main",
103103
"parse_script",
104+
"parse_server_config",
105+
"parse_variables",
104106
"read_ssh_config",
105107
"run_script",
106108
]

src/shellflow/executor.py

Lines changed: 67 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
import subprocess
99
import threading
1010
import uuid
11+
from contextlib import suppress
1112
from pathlib import Path
12-
from typing import Any, Callable
13+
from typing import TYPE_CHECKING, Any
14+
15+
if TYPE_CHECKING:
16+
from collections.abc import Callable
1317

1418
from .constants import TRACE_MARKER
1519
from .models import Block, BlockExecutor, CommandLog, ExecutionContext, ExecutionResult, SSHConfig
@@ -316,6 +320,7 @@ def _run_remote_subprocess(
316320
*,
317321
timeout_seconds: int | None,
318322
on_command: Callable[[str], None] | None = None,
323+
on_output: Callable[[str], None] | None = None,
319324
) -> tuple[str, str, int, bool, bool]:
320325
"""Run an SSH subprocess and preserve partial output on timeout or interruption."""
321326
process = subprocess.Popen(
@@ -330,6 +335,7 @@ def _run_remote_subprocess(
330335
input_text=remote_script,
331336
timeout_seconds=timeout_seconds,
332337
on_command=on_command,
338+
on_output=on_output,
333339
)
334340

335341

@@ -418,6 +424,7 @@ def execute_local_traced(
418424
context: ExecutionContext,
419425
no_input: bool = False,
420426
on_command: Callable[[str], None] | None = None,
427+
on_output: Callable[[str], None] | None = None,
421428
) -> ExecutionResult:
422429
"""Execute a local block while preserving per-command trace logs."""
423430
if not block.commands:
@@ -453,6 +460,7 @@ def execute_local_traced(
453460
input_text=input_text,
454461
timeout_seconds=block.timeout_seconds,
455462
on_command=on_command,
463+
on_output=on_output,
456464
)
457465
cleaned_stderr = _strip_trace_markers(stderr).strip()
458466
command_logs = _parse_grouped_trace_output(
@@ -505,6 +513,7 @@ def execute_remote(
505513
no_input: bool = False,
506514
servers: dict[str, dict[str, str]] | None = None,
507515
on_command: Callable[[str], None] | None = None,
516+
on_output: Callable[[str], None] | None = None,
508517
) -> ExecutionResult:
509518
"""Execute a remote block via SSH.
510519
@@ -580,6 +589,7 @@ def execute_remote(
580589
remote_script,
581590
timeout_seconds=block.timeout_seconds,
582591
on_command=on_command,
592+
on_output=on_output,
583593
)
584594
cleaned_stdout = _strip_trace_markers(stdout)
585595
cleaned_stderr = _strip_trace_markers(stderr)
@@ -779,6 +789,7 @@ def _run_traced_subprocess(
779789
input_text: str | None,
780790
timeout_seconds: int | None,
781791
on_command: Callable[[str], None] | None = None,
792+
on_output: Callable[[str], None] | None = None,
782793
) -> tuple[str, str, int, bool, bool]:
783794
"""Read traced stdout/stderr streams while surfacing live command markers."""
784795
stdout_chunks: list[str] = []
@@ -789,25 +800,16 @@ def _run_traced_subprocess(
789800
if stdout_stream is None or stderr_stream is None:
790801
raise subprocess.SubprocessError("traced subprocess did not expose stdout/stderr pipes")
791802

792-
def read_stdout() -> None:
793-
try:
794-
for line in iter(stdout_stream.readline, ""):
795-
stdout_chunks.append(line)
796-
command = _extract_trace_command_line(line)
797-
if command is not None and on_command is not None:
798-
on_command(command)
799-
finally:
800-
stdout_stream.close()
801-
802-
def read_stderr() -> None:
803-
try:
804-
for line in iter(stderr_stream.readline, ""):
805-
stderr_chunks.append(line)
806-
finally:
807-
stderr_stream.close()
808-
809-
stdout_thread = threading.Thread(target=read_stdout, daemon=True)
810-
stderr_thread = threading.Thread(target=read_stderr, daemon=True)
803+
stdout_thread = threading.Thread(
804+
target=_read_traced_stdout_stream,
805+
args=(stdout_stream, stdout_chunks, on_command, on_output),
806+
daemon=True,
807+
)
808+
stderr_thread = threading.Thread(
809+
target=_read_traced_output_stream,
810+
args=(stderr_stream, stderr_chunks, on_output),
811+
daemon=True,
812+
)
811813
stdout_thread.start()
812814
stderr_thread.start()
813815

@@ -816,14 +818,7 @@ def read_stderr() -> None:
816818
stdin_stream = process.stdin
817819

818820
try:
819-
if stdin_stream is not None:
820-
if input_text is not None:
821-
try:
822-
stdin_stream.write(input_text)
823-
except BrokenPipeError:
824-
pass
825-
stdin_stream.close()
826-
821+
_write_process_input(stdin_stream, input_text)
827822
process.wait(timeout=timeout_seconds)
828823
except subprocess.TimeoutExpired:
829824
timed_out = True
@@ -853,6 +848,50 @@ def read_stderr() -> None:
853848
)
854849

855850

851+
def _read_traced_stdout_stream(
852+
stdout_stream: Any,
853+
stdout_chunks: list[str],
854+
on_command: Callable[[str], None] | None,
855+
on_output: Callable[[str], None] | None,
856+
) -> None:
857+
"""Read traced stdout, separating command markers from regular output."""
858+
try:
859+
for line in iter(stdout_stream.readline, ""):
860+
stdout_chunks.append(line)
861+
command = _extract_trace_command_line(line)
862+
if command is not None and on_command is not None:
863+
on_command(command)
864+
elif command is None and on_output is not None:
865+
on_output(line)
866+
finally:
867+
stdout_stream.close()
868+
869+
870+
def _read_traced_output_stream(
871+
stream: Any,
872+
chunks: list[str],
873+
on_output: Callable[[str], None] | None,
874+
) -> None:
875+
"""Read a traced process output stream and forward raw chunks."""
876+
try:
877+
for line in iter(stream.readline, ""):
878+
chunks.append(line)
879+
if on_output is not None:
880+
on_output(line)
881+
finally:
882+
stream.close()
883+
884+
885+
def _write_process_input(stdin_stream: Any, input_text: str | None) -> None:
886+
"""Write the traced script to stdin, tolerating early process exits."""
887+
if stdin_stream is None:
888+
return
889+
if input_text is not None:
890+
with suppress(BrokenPipeError):
891+
stdin_stream.write(input_text)
892+
stdin_stream.close()
893+
894+
856895
def _parse_grouped_trace_output(
857896
combined_output: str,
858897
*,

src/shellflow/runner.py

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
import threading
56
import time
67
import uuid
78
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -809,6 +810,44 @@ def _print_live_command_status(command: str, command_index: int, total_commands:
809810
print(f"> [{command_index}/{total_commands}] {command}", flush=True)
810811

811812

813+
class _LiveCommandOutputPrinter:
814+
"""Print each traced command's buffered output before the next command starts."""
815+
816+
def __init__(self, total_commands: int, output_tail_lines: int) -> None:
817+
self.total_commands = total_commands
818+
self.output_tail_lines = output_tail_lines
819+
self.command_count = 0
820+
self._output_chunks: list[str] = []
821+
self._lock = threading.Lock()
822+
823+
def start_command(self, command: str) -> None:
824+
"""Start a command section, flushing the previous command's output first."""
825+
with self._lock:
826+
if self.command_count:
827+
self._flush_locked()
828+
self.command_count += 1
829+
_print_live_command_status(command, self.command_count, self.total_commands)
830+
831+
def append_output(self, chunk: str) -> None:
832+
"""Buffer output for the active traced command."""
833+
if not chunk:
834+
return
835+
with self._lock:
836+
self._output_chunks.append(chunk)
837+
838+
def finish(self) -> None:
839+
"""Flush the final command's output after the traced process exits."""
840+
with self._lock:
841+
if self.command_count:
842+
self._flush_locked()
843+
844+
def _flush_locked(self) -> None:
845+
output = "".join(self._output_chunks).strip()
846+
self._output_chunks = []
847+
if output:
848+
print(_truncate_output_lines(output, self.output_tail_lines), flush=True)
849+
850+
812851
def _group_blocks_for_parallel_execution(blocks: list[Block]) -> list[list[Block]]:
813852
"""Group blocks for parallel execution based on parallel annotations.
814853
@@ -921,41 +960,41 @@ def _execute_remote_block_sequential(
921960
DIM = ANSI_DIM
922961
RESET = ANSI_RESET
923962

924-
live_command_index = 0
925-
926-
def on_command(command: str) -> None:
927-
nonlocal live_command_index
928-
live_command_index += 1
929-
_print_live_command_status(command, live_command_index, len(commands_to_execute))
963+
live_printer = _LiveCommandOutputPrinter(len(commands_to_execute), output_tail_lines) if verbose else None
930964

931965
result = executor_module.execute_remote(
932966
block,
933967
context,
934968
ssh_config=None,
935969
no_input=no_input,
936970
servers=servers,
937-
on_command=on_command if verbose else None,
971+
on_command=live_printer.start_command if live_printer else None,
972+
on_output=live_printer.append_output if live_printer else None,
938973
)
939974

940975
if verbose:
976+
if live_printer:
977+
live_printer.finish()
978+
941979
# Assign actual command names to parsed logs
942980
if result.command_logs:
943981
for i, cl in enumerate(result.command_logs):
944982
if cl.command == "<remote-command>" and i < len(commands_to_execute):
945983
cl.command = commands_to_execute[i]
946984

947-
# Print each command with its output
948-
if result.command_logs:
949-
_print_command_logs(result.command_logs, output_tail_lines)
950-
if result.output and not any(command_log.output for command_log in result.command_logs):
951-
print(_truncate_output_lines(result.output, output_tail_lines))
952-
else:
953-
# Fallback: no command logs, just show commands
954-
for cmd in commands_to_execute:
955-
print(f"{DIM}$ {cmd}{RESET}")
956-
if result.output:
957-
truncated = _truncate_output_lines(result.output, output_tail_lines)
958-
print(truncated)
985+
# Print grouped logs only when no live trace markers were observed.
986+
if not (live_printer and live_printer.command_count):
987+
if result.command_logs:
988+
_print_command_logs(result.command_logs, output_tail_lines)
989+
if result.output and not any(command_log.output for command_log in result.command_logs):
990+
print(_truncate_output_lines(result.output, output_tail_lines))
991+
else:
992+
# Fallback: no command logs, just show commands
993+
for cmd in commands_to_execute:
994+
print(f"{DIM}$ {cmd}{RESET}")
995+
if result.output:
996+
truncated = _truncate_output_lines(result.output, output_tail_lines)
997+
print(truncated)
959998

960999
context.last_output = result.output
9611000
context.success = result.success
@@ -1006,27 +1045,27 @@ def _execute_local_block_sequential(
10061045
RED = ANSI_RED
10071046
RESET = ANSI_RESET
10081047

1009-
live_command_index = 0
1010-
1011-
def on_command(command: str) -> None:
1012-
nonlocal live_command_index
1013-
live_command_index += 1
1014-
_print_live_command_status(command, live_command_index, len(commands_to_execute))
1048+
live_printer = _LiveCommandOutputPrinter(len(commands_to_execute), output_tail_lines) if verbose else None
10151049

10161050
result = execute_local_traced(
10171051
block,
10181052
context,
10191053
no_input=no_input,
1020-
on_command=on_command if verbose else None,
1054+
on_command=live_printer.start_command if live_printer else None,
1055+
on_output=live_printer.append_output if live_printer else None,
10211056
)
10221057

10231058
if verbose:
1024-
if result.command_logs:
1025-
_print_command_logs(result.command_logs, output_tail_lines)
1026-
if result.output and not any(command_log.output for command_log in result.command_logs):
1059+
if live_printer:
1060+
live_printer.finish()
1061+
1062+
if not (live_printer and live_printer.command_count):
1063+
if result.command_logs:
1064+
_print_command_logs(result.command_logs, output_tail_lines)
1065+
if result.output and not any(command_log.output for command_log in result.command_logs):
1066+
print(_truncate_output_lines(result.output, output_tail_lines))
1067+
elif result.output:
10271068
print(_truncate_output_lines(result.output, output_tail_lines))
1028-
elif result.output:
1029-
print(_truncate_output_lines(result.output, output_tail_lines))
10301069

10311070
# Print exit code if failed
10321071
if not result.success and verbose:

0 commit comments

Comments
 (0)