Skip to content
Closed
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ Failed tasks are automatically retried up to 3 times with increasing backoff
before the session is saved. Session checkpoints are stored in the
platform-specific application data directory.

### Auto-save tool log

Set `AUTO_SAVE_DIR` and `AUTO_SAVE_INTERVAL` to enable periodic tool-result
logging. Every N tool calls, the runner appends an NDJSON entry to
`{AUTO_SAVE_DIR}/auto_save_tool_log.ndjson`. Disabled by default (interval=0).

### Error Output

By default, errors are shown as concise one-line messages. Use `--debug` (or
Expand Down Expand Up @@ -429,6 +435,9 @@ confirm:
A sequence of interdependent tasks performed by a set of Agents. Configured through YAML files of `filetype` `taskflow`.
Taskflows supports a number of features, and their details can be found [here](doc/GRAMMAR.md).

For long-running audits, `max_consecutive_same_tool` can limit repetitive tool
calls (see [GRAMMAR.md](doc/GRAMMAR.md#max_consecutive_same_tool)).

Example:

```yaml
Expand Down
18 changes: 18 additions & 0 deletions doc/GRAMMAR.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,24 @@ Example:
...
```

### max_consecutive_same_tool

`max_consecutive_same_tool` sets the maximum number of times the same tool can be called
consecutively before the task is aborted with a checkpoint.

- **Omitted or `null`** (default): inherits from the `LOOP_MAX_CONSECUTIVE` environment variable; if that is also unset or `0`, detection is disabled.
- **`0`**: explicitly disables detection for this task, even when `LOOP_MAX_CONSECUTIVE` is set globally.
- **Positive integer**: enables detection at that threshold.

```yaml
- task:
max_consecutive_same_tool: 10
agents:
- seclab_taskflow_agent.personalities.assistant
user_prompt: |
Audit this codebase.
```

### Running templated tasks in a loop

Often we may want to iterate through the same tasks with different inputs. For example, we may want to fetch all the functions from a code base and then analyze each of the functions. This can be done using two consecutive tasks and with the help of the `repeat_prompt` field.
Expand Down
1 change: 1 addition & 0 deletions src/seclab_taskflow_agent/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class TaskDefinition(BaseModel):
env: dict[str, str] = Field(default_factory=dict)
inputs: dict[str, Any] = Field(default_factory=dict)
max_steps: int = 0 # 0 means use the runner default
max_consecutive_same_tool: int | None = None # None = use LOOP_MAX_CONSECUTIVE env; 0 = disabled
uses: str = ""

# async settings (``async`` is a reserved word, aliased)
Expand Down
139 changes: 139 additions & 0 deletions src/seclab_taskflow_agent/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@

__all__ = [
"DEFAULT_MAX_TURNS",
"LoopDetectedError",
"MAX_API_RETRY",
"MAX_RATE_LIMIT_BACKOFF",
"RATE_LIMIT_BACKOFF",
"check_consecutive_tool_loop",
"deploy_task_agents",
"read_tool_log",
"run_main",
"write_auto_save",
]

import asyncio
import contextlib
import json
import logging
import os
Expand Down Expand Up @@ -52,6 +57,90 @@
TASK_RETRY_LIMIT = 3 # Maximum retry attempts for a failed task
TASK_RETRY_BACKOFF = 10 # Initial backoff in seconds between task retries

AUTO_SAVE_LOG_NAME = "auto_save_tool_log.ndjson"


def write_auto_save(
auto_save_dir: str,
turn: int,
tool_name: str,
result: str,
) -> None:
"""Append tool result to auto-save log (NDJSON, append-only)."""
try:
os.makedirs(auto_save_dir, exist_ok=True)
save_path = os.path.join(auto_save_dir, AUTO_SAVE_LOG_NAME)
entry = json.dumps({
"turn": turn,
"tool": tool_name,
"result_preview": (result or "")[:2000],
})
with open(save_path, "a", encoding="utf-8") as f:
f.write(entry + "\n")
except Exception as e:
logging.warning(f"Auto-save failed: {e}")


def read_tool_log(auto_save_dir: str) -> list[dict]:
"""Read NDJSON auto-save log. Skips malformed lines."""
if not auto_save_dir:
return []
entries: list[dict] = []
try:
path = os.path.join(auto_save_dir, AUTO_SAVE_LOG_NAME)
if os.path.exists(path):
with open(path, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
with contextlib.suppress(json.JSONDecodeError):
entries.append(json.loads(line))
except Exception:
logging.debug("Failed to read auto-save tool log", exc_info=True)
return entries


class LoopDetectedError(Exception):
"""Raised when the agent enters a repetitive tool-call loop."""

def __init__(self, message: str, tool_name: str, count: int):
super().__init__(message)
self.tool_name = tool_name
self.count = count


def check_consecutive_tool_loop(
tool_name: str,
consecutive_name: list[str],
consecutive_count: list[int],
threshold: int,
) -> None:
"""Track consecutive same-tool calls and raise on threshold.

Args:
tool_name: Name of the tool that just completed.
consecutive_name: Single-element list holding the current tool name streak.
consecutive_count: Single-element list holding the current streak count.
threshold: Maximum allowed consecutive calls. 0 or negative disables.

Raises:
LoopDetectedError: When *consecutive_count* reaches *threshold*.
"""
if threshold <= 0:
return
if consecutive_name[0] == tool_name:
consecutive_count[0] += 1
else:
consecutive_name[0] = tool_name
consecutive_count[0] = 1
if consecutive_count[0] >= threshold:
raise LoopDetectedError(
f"{tool_name} called {consecutive_count[0]} times consecutively",
tool_name=tool_name,
count=consecutive_count[0],
)


def _resolve_model_config(
available_tools: AvailableTools,
Expand Down Expand Up @@ -467,8 +556,38 @@ async def run_main(

last_mcp_tool_results: list[str] = []

# Auto-save scaffolding: periodically persist tool results to disk.
# Disabled by default (interval=0). Set AUTO_SAVE_DIR and
# AUTO_SAVE_INTERVAL to enable.
_tool_call_counter = [0]
try:
_auto_save_interval = int(os.getenv("AUTO_SAVE_INTERVAL", "0"))
except ValueError:
logging.warning("Invalid AUTO_SAVE_INTERVAL value, defaulting to 0 (disabled)")
_auto_save_interval = 0
_auto_save_dir = os.getenv("AUTO_SAVE_DIR", "")

# Loop detection state
_consecutive_tool_name = [""]
_consecutive_tool_count = [0]
_loop_threshold = [0] # 0=disabled
_loop_is_async = [False]

async def on_tool_end_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool, result: str) -> None:
last_mcp_tool_results.append(result)
_tool_call_counter[0] += 1
if _auto_save_dir and _auto_save_interval and _tool_call_counter[0] % _auto_save_interval == 0:
write_auto_save(_auto_save_dir, _tool_call_counter[0], tool.name, result)

if not _loop_is_async[0]:
try:
check_consecutive_tool_loop(
tool.name, _consecutive_tool_name, _consecutive_tool_count, _loop_threshold[0],
)
except LoopDetectedError:
if _auto_save_dir:
write_auto_save(_auto_save_dir, _tool_call_counter[0], tool.name, (result or "")[:500])
raise

async def on_tool_start_hook(context: RunContextWrapper[TContext], agent: Agent[TContext], tool: Tool) -> None:
await render_model_output(f"\n** 🤖🛠️ Tool Call: {tool.name}\n")
Expand Down Expand Up @@ -566,6 +685,18 @@ async def on_handoff_hook(context: RunContextWrapper[TContext], agent: Agent[TCo
async_task = task.async_task
max_concurrent_tasks = task.async_limit

# Configure loop detection for this task
task_loop_val = getattr(task, "max_consecutive_same_tool", None)
if task_loop_val is not None:
_loop_threshold[0] = task_loop_val
else:
try:
_loop_threshold[0] = int(os.getenv("LOOP_MAX_CONSECUTIVE", "0"))
except ValueError:
logging.warning("Invalid LOOP_MAX_CONSECUTIVE value, defaulting to 0 (disabled)")
_loop_threshold[0] = 0
_loop_is_async[0] = async_task

# Render prompt template (skip if repeat_prompt — result not yet available)
if task_prompt and not repeat_prompt:
try:
Expand Down Expand Up @@ -601,6 +732,10 @@ async def run_prompts(async_task: bool = False, max_concurrent_tasks: int = 5) -
task_results: list[Any] = []
semaphore = asyncio.Semaphore(max_concurrent_tasks)
for p_prompt in prompts_to_run:
# Reset loop detection per prompt
_consecutive_tool_name[0] = ""
_consecutive_tool_count[0] = 0

resolved_agents: dict[str, Any] = {}
current_agents = list(agents_list)
if not current_agents:
Expand Down Expand Up @@ -681,6 +816,10 @@ async def _deploy(ra: dict, pp: str) -> bool:
break
except (KeyboardInterrupt, SystemExit):
raise
except LoopDetectedError as exc:
last_task_error = exc
logging.warning(f"Loop detected in task {task_name!r}: {exc}")
break
except (APIConnectionError, APITimeoutError, ConnectionError, TimeoutError) as exc:
last_task_error = exc
remaining = TASK_RETRY_LIMIT - attempt - 1
Expand Down
Loading