Skip to content

Commit 14ccec4

Browse files
committed
Implement Task 8: Advanced Execution Modes
- Add parallel execution support with @parallel marker - Create advanced_modes.py module for parallel execution logic - Modify run_script to support execution modes (sequential/parallel) - Add tests for parallel and sequential execution modes - Add BDD tests for execution modes - Update parser to handle @parallel annotations on blocks - Fix color key casing in verbose output
1 parent 2e4beb1 commit 14ccec4

5 files changed

Lines changed: 442 additions & 54 deletions

File tree

features/modes.feature

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
Feature: Advanced Execution Modes
2+
As a DevOps engineer
3+
I want to execute scripts with different execution modes
4+
So that I can optimize execution time and control execution flow
5+
6+
Scenario: Execute blocks in parallel
7+
Given a script file with the following content:
8+
"""
9+
# @PARALLEL
10+
# @LOCAL
11+
echo "parallel-task-1"
12+
13+
# @LOCAL
14+
echo "parallel-task-2"
15+
"""
16+
When I run the script with mode "parallel"
17+
Then the execution should succeed
18+
And the output should contain "parallel-task-1"
19+
And the output should contain "parallel-task-2"
20+
21+
Scenario: Execute blocks sequentially
22+
Given a script file with the following content:
23+
"""
24+
# @LOCAL
25+
echo "sequential-task-1"
26+
27+
# @LOCAL
28+
echo "sequential-task-2"
29+
"""
30+
When I run the script with mode "sequential"
31+
Then the execution should succeed
32+
And the output should contain "sequential-task-1"
33+
And the output should contain "sequential-task-2"
34+
35+
Scenario: Mixed parallel and sequential blocks
36+
Given a script file with the following content:
37+
"""
38+
# @PARALLEL group1
39+
# @LOCAL
40+
echo "parallel-1"
41+
42+
# @LOCAL
43+
echo "parallel-2"
44+
45+
# @LOCAL
46+
echo "sequential"
47+
"""
48+
When I run the script with mode "parallel"
49+
Then the execution should succeed
50+
And the output should contain "parallel-1"
51+
And the output should contain "parallel-2"
52+
And the output should contain "sequential"

features/steps/shellflow_steps.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,44 @@ def step_when_run_the_script_with_audit_log_path(context: Context) -> None:
869869
when_run_the_script_with_cli_args(context, "--audit-log", str(audit_log_path), "--jsonl")
870870

871871

872+
@when('I run the script with mode "{mode}"')
873+
def step_when_run_script_with_mode(context: Context, mode: str) -> None:
874+
"""Run the parsed script with a specific execution mode."""
875+
script_content = getattr(context, "script_content", None)
876+
if not script_content:
877+
raise ValueError("No script content set. Did you call the Given step first?")
878+
879+
try:
880+
macros = parse_macros(script_content)
881+
helpers = parse_helpers(script_content)
882+
variables = parse_variables(script_content)
883+
hooks = parse_hooks(script_content)
884+
blocks = parse_script(script_content, macros, helpers, hooks)
885+
servers = parse_server_config(script_content)
886+
except ParseError as error:
887+
context.run_result = None
888+
context.block_results = []
889+
context.stdout = ""
890+
context.stderr = str(error)
891+
context.exit_code = 1
892+
return
893+
894+
with (
895+
mock.patch(
896+
"shellflow.read_ssh_config",
897+
side_effect=lambda host, servers=None: _read_ssh_config_for_context(context, host),
898+
),
899+
mock.patch("shellflow.execute_remote", side_effect=_fake_execute_remote),
900+
):
901+
result = run_script(blocks, servers, verbose=getattr(context, "verbose", False), mode=mode, variables=variables, macros=macros, helpers=helpers, hooks=hooks)
902+
903+
context.run_result = result
904+
context.block_results = result.block_results
905+
context.stdout = "\n".join(block.output for block in result.block_results if block.output)
906+
context.stderr = result.error_message if not result.success else ""
907+
context.exit_code = 0 if result.success else 1
908+
909+
872910
@when("the script is parsed")
873911
def step_when_the_script_is_parsed(context: Context) -> None:
874912
when_the_script_is_parsed(context)

src/advanced_modes.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""Advanced execution modes for Shellflow.
2+
3+
This module provides advanced execution modes including parallel execution
4+
and dry-run capabilities.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import threading
10+
import time
11+
from concurrent.futures import ThreadPoolExecutor, as_completed
12+
from typing import Any
13+
14+
15+
def run_parallel(
16+
blocks,
17+
context,
18+
servers=None,
19+
no_input=False,
20+
verbose=False,
21+
output_tail_lines=20,
22+
run_id="",
23+
total_blocks=0,
24+
):
25+
"""Execute blocks in parallel using a thread pool.
26+
27+
Args:
28+
blocks: List of blocks to execute in parallel.
29+
context: Shared execution context.
30+
servers: SSH server configurations.
31+
no_input: Whether to disable interactive input.
32+
verbose: Whether to enable verbose output.
33+
output_tail_lines: Maximum output lines to show.
34+
run_id: Run identifier for events.
35+
total_blocks: Total number of blocks in the script.
36+
37+
Returns:
38+
List of execution results in the order they completed.
39+
"""
40+
# Import here to avoid circular imports
41+
from shellflow import (
42+
ExecutionResult,
43+
_execute_block_standard,
44+
_finalize_block_result,
45+
_apply_block_exports,
46+
_get_verbose_colors,
47+
)
48+
49+
if not blocks:
50+
return []
51+
52+
# ANSI color codes for verbose output
53+
colors = _get_verbose_colors()
54+
55+
results: list[ExecutionResult] = []
56+
lock = threading.Lock()
57+
58+
def execute_block_worker(block: Block, block_index: int) -> ExecutionResult:
59+
"""Execute a single block and return its result."""
60+
block_id = f"block-{block_index}"
61+
start_time = time.perf_counter()
62+
63+
# Execute the block
64+
result = _execute_block_standard(
65+
block, context, servers, no_input, verbose, block_index, total_blocks,
66+
output_tail_lines, colors, [], run_id
67+
)
68+
69+
result = _finalize_block_result(result, block, block_index, start_time)
70+
result.block_id = block_id
71+
result.block_index = block_index
72+
73+
# Apply exports to shared context (thread-safe)
74+
with lock:
75+
result.exported_env = _apply_block_exports(block, result, context)
76+
# Update shared context
77+
context.last_output = result.output
78+
context.success = result.success
79+
80+
return result
81+
82+
# Execute blocks in parallel
83+
with ThreadPoolExecutor(max_workers=len(blocks)) as executor:
84+
# Submit all tasks
85+
future_to_block = {
86+
executor.submit(execute_block_worker, block, i + 1): (block, i + 1)
87+
for i, block in enumerate(blocks)
88+
}
89+
90+
# Collect results as they complete
91+
for future in as_completed(future_to_block):
92+
block, block_index = future_to_block[future]
93+
try:
94+
result = future.result()
95+
results.append(result)
96+
except Exception as exc:
97+
# Handle any exceptions from the worker thread
98+
error_result = ExecutionResult(
99+
success=False,
100+
output="",
101+
error_message=f"Parallel execution failed: {exc}",
102+
block_index=block_index,
103+
source_line=block.source_line,
104+
)
105+
results.append(error_result)
106+
107+
# Sort results by block index to maintain order
108+
results.sort(key=lambda r: r.block_index)
109+
110+
return results

0 commit comments

Comments
 (0)