Skip to content

Commit 25a76c8

Browse files
authored
Resume from stale batches (#155)
* Add stale batch retry logic * Bump black * bump version v26.10.0 -> v26.15.0
1 parent cd398aa commit 25a76c8

9 files changed

Lines changed: 635 additions & 251 deletions

File tree

.pre-commit-config.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ repos:
3232
hooks:
3333
- id: prettier
3434
- repo: https://github.com/psf/black
35-
rev: 26.3.0
35+
rev: 26.3.1
3636
hooks:
3737
- id: black
3838
args:
@@ -41,6 +41,10 @@ repos:
4141
rev: v2.2.2
4242
hooks:
4343
- id: codespell
44+
exclude: >
45+
(?x)^(
46+
.*/pgp\.py
47+
)$
4448
args:
4549
- --ignore-words-list=
4650
- --skip="./.*,*.csv,*.json"

AGENTS.md

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# AGENTS.md — Open Task Framework (OTF)
2+
3+
This file is written for autonomous agents and maintainers who will modify, test, and extend the Open Task Framework codebase; it provides focused, actionable context so automated systems can make safe, verifiable changes.
4+
5+
## Table of contents
6+
7+
- [AGENTS.md — Open Task Framework (OTF)](#agentsmd--open-task-framework-otf)
8+
- [Table of contents](#table-of-contents)
9+
- [Quick scan](#quick-scan)
10+
- [High-level summary](#high-level-summary)
11+
- [Architecture and main components](#architecture-and-main-components)
12+
- [Contracts and data shapes — precise](#contracts-and-data-shapes--precise)
13+
- [Variable resolution \& templates](#variable-resolution--templates)
14+
- [Developer and agent workflow — run / test / iterate](#developer-and-agent-workflow--run--test--iterate)
15+
- [Concrete examples (copy-and-paste)](#concrete-examples-copy-and-paste)
16+
- [Tests, debugging, and logs](#tests-debugging-and-logs)
17+
- [CI and release pointers](#ci-and-release-pointers)
18+
- [Best practices for automated agents (rules)](#best-practices-for-automated-agents-rules)
19+
- [Where to look for related code](#where-to-look-for-related-code)
20+
- [Change summary and contact](#change-summary-and-contact)
21+
22+
## Quick scan
23+
24+
- Entry point(s): `src/opentaskpy/taskrun.py`, `src/opentaskpy/taskhandlers/taskhandler.py`
25+
- Schemas: `src/opentaskpy/config/schemas/` (validation source of truth)
26+
- Remote handlers: `src/opentaskpy/remotehandlers/` (SSH/SFTP/local/email/dummy)
27+
- Plugins: `src/opentaskpy/plugins/` (lookup family)
28+
- Run unit tests: `pytest tests/ -q`
29+
30+
## High-level summary
31+
32+
Open Task Framework is a modular Python framework that validates and runs tasks defined as JSON documents. Tasks describe either an execution (run a command) or a transfer (move files). The framework uses pluggable remote handlers (execution and transfer) to support protocols like SSH, SFTP, WinRM, and cloud storage services.
33+
34+
Key responsibilities:
35+
36+
- Validate task payloads against JSON schemas.
37+
- Orchestrate execution and transfer flows via `taskhandler` components.
38+
- Provide well-encapsulated protocol handlers: concrete classes implement a consistent handler interface so the taskhandler layer can be protocol-agnostic.
39+
- Provide test fixtures (unit and integration) to verify both logic and environment interactions.
40+
41+
## Architecture and main components
42+
43+
1. Core package: `src/opentaskpy`
44+
45+
- `taskhandler` — central orchestration logic: accepts a validated task, decides whether to call execution or transfer workflow, orchestrates staging and cleanup, and returns standardized results.
46+
- `remotehandlers` — contains abstract base classes and concrete implementations. Expect classes following the naming convention: `*Transfer` and `*Execution`.
47+
- `config/schemas` — JSON schemas that the code uses to validate task payloads. Schemas are authoritative; runtime assumes inputs match them.
48+
- `otflogging` — logging helpers used across the project for consistent log formatting and task-scoped contexts.
49+
50+
2. Tests and fixtures:
51+
52+
- `tests/` — pytest test suite with unit tests (fast) and integration tests (may require docker-compose fixtures).
53+
- `test/` — helper scripts and docker-compose configurations used to stand up test services (sshd, mock services). Look for `createTestFiles.sh`, `createTestDirectories.sh`, and `setupSSHKeys.sh`.
54+
55+
3. Addons: repository-level addons live in sibling repos. Each addon should follow the same shape: `remotehandlers` implementations, config schemas, tests, and an optional `AGENTS.md` describing the addon details (example: winrm addon).
56+
57+
## Contracts and data shapes — precise
58+
59+
Task manifest (canonical fields):
60+
61+
- `id` (string): unique task identifier
62+
- `type` (string): one of `transfer`, `execution`, `batch`
63+
- `source` / `destination` (objects): for transfers, each contains `hostname`, `directory`, `fileRegex`, and `protocol`
64+
- `hostname`, `directory`, `command` (for execution tasks)
65+
- `protocol` (object): minimally `{ "name": "<python-class-path>", "credentials": {...}, ... }`
66+
67+
Protocol object details:
68+
69+
- `name` (string): importable Python class path implementing a Transfer or Execution handler
70+
- `credentials` (object): fields are protocol-specific (e.g., `username`/`password`, `cert_pem`, `transport`)
71+
- `server_cert_validation` / `port` / `transport` are optional common fields used by multiple handlers
72+
73+
Handler interface expectations (implementations MUST):
74+
75+
- Transfer handlers expose: `list_files(spec)`, `pull_file(spec, dest)`, `push_file(spec, src)`, `move_file(spec)`, `delete_file(spec)`, plus bulk helpers like `pull_files_to_worker()` and `push_files_from_worker()`
76+
- Execution handlers expose: `execute(spec)` returning a controlled stream or result object, plus `kill(pid)` to request termination. Results must include `exit_code`, `stdout`, `stderr`. If a PID token is emitted by the remote, include `pid` in results.
77+
78+
Error model:
79+
80+
- Handlers should raise specific exceptions for common error classes (validation error, auth error, networkIO error). Taskhandler should catch and translate to standardized result objects for callers and tests.
81+
82+
If you change these shapes, update the JSON schemas in `src/opentaskpy/config/schemas/` and add/update tests in `tests/`.
83+
84+
## Variable resolution & templates
85+
86+
- File types: configuration and task payloads are JSON-based only. Files are either plain `.json` or Jinja2 templates with a `.json.j2` extension. YAML is not used for task/config payloads.
87+
- Pipeline: when a task/template file is loaded the system performs this pipeline:
88+
1. Read the `.json` or `.json.j2` file.
89+
90+
2. If it is a Jinja2 template (`.json.j2`), render it with the available context (variables, plugin helpers, and environment values).
91+
3. Parse the rendered output as JSON.
92+
4. Validate the parsed JSON against the appropriate schema in `src/opentaskpy/config/schemas/`.
93+
94+
- Template context and helpers: lookup plugins (see `src/opentaskpy/plugins/lookup`) and other small helpers/filters are available to templates to compute values at render time. Templates must render to valid JSON — agents should always validate rendered output before runtime.
95+
96+
- Guidance for agents:
97+
- When editing templates, ensure the rendered output is syntactically valid JSON (use a local render step in tests).
98+
- Do not introduce template constructs that rely on secrets stored in-repo; use environment variables or test fixtures for secret injection.
99+
- If new helpers/plugins are required by templates, add them under `src/opentaskpy/plugins/` and include unit tests that exercise rendering.
100+
101+
## Developer and agent workflow — run / test / iterate
102+
103+
Local dev quickstart (recommended):
104+
105+
1. Create and activate a virtual environment
106+
107+
```bash
108+
python -m venv .venv
109+
source .venv/bin/activate
110+
pip install -e .[test]
111+
```
112+
113+
2. Run a focused unit test
114+
115+
```bash
116+
pytest tests/test_file_helper.py::test_some_helper -q
117+
```
118+
119+
3. Run full unit test suite
120+
121+
```bash
122+
pytest tests/ -q
123+
```
124+
125+
Integration tests (requires docker):
126+
127+
```bash
128+
cd test
129+
./createTestDirectories.sh && ./createTestFiles.sh
130+
docker-compose up -d
131+
./setupSSHKeys.sh
132+
cd ..
133+
pytest tests/ -q
134+
```
135+
136+
CI notes:
137+
138+
- The project uses `pyproject.toml` for packaging and `pytest.ini` for test config. CI should install dependencies with `pip install -e .[test]` and run `pytest -q`.
139+
- Integration tests that depend on docker-compose should be gated behind a separate job that runs `cd test && docker-compose up -d` first.
140+
141+
## Concrete examples (copy-and-paste)
142+
143+
Example task manifest — execution
144+
145+
```json
146+
{
147+
"id": "task-123",
148+
"type": "execution",
149+
"hostname": "127.0.0.1",
150+
"directory": "/tmp",
151+
"command": "echo hello",
152+
"protocol": {
153+
"name": "ssh",
154+
"credentials": { "username": "test", "keyFile": "path/to/key" }
155+
}
156+
}
157+
```
158+
159+
Example transfer protocol snippet (schema-driven)
160+
161+
```json
162+
{
163+
"name": "sftp",
164+
"credentials": { "username": "user", "keyFile": "path/to/key" }
165+
}
166+
```
167+
168+
## Tests, debugging, and logs
169+
170+
- Test fixtures live in `tests/fixtures` or are defined in `tests/conftest.py`. Reuse existing fixtures whenever possible.
171+
- Integration test logs and artifacts created by `test/` helper scripts are placed under `test/testLogs/` for easy inspection.
172+
- Logging format: use `otflogging` helpers to include `task_id` and `hostname` in logs. New code should add context to loggers so tests can assert on log markers if needed.
173+
174+
Common debugging steps:
175+
176+
- Reproduce failing test locally with `-k <test_name>` and `-s` to see stdout/stderr streaming.
177+
- Inspect `test/testLogs/` for integration failures.
178+
- For networking/auth issues, replicate the protocol flow manually in a small script that uses the same handler class to connect and run a simple command.
179+
180+
## CI and release pointers
181+
182+
- Ensure `pyproject.toml` and `MANIFEST.in` contain any new package data files you add.
183+
- Bump versions according to semantic versioning and update `CHANGELOG.md` when releasing.
184+
- Unit tests should be quick; heavy integration tests should run in separate CI jobs that provision docker services.
185+
186+
## Best practices for automated agents (rules)
187+
188+
1. Run the unit tests that exercise your changed files before creating a PR. If you cannot reproduce remote integration locally, add/modify only unit tests or mock the remote layer.
189+
2. Never add secrets to the repo. Use environment variables or test fixtures that generate ephemeral keys.
190+
3. If changing a JSON schema, update the schema file and add at least one positive and one negative test case.
191+
4. Limit scope of edits in a single PR: small, focused changes are easier to review and test.
192+
193+
## Where to look for related code
194+
195+
- `src/opentaskpy/taskhandlers/taskhandler.py` and `src/opentaskpy/taskhandlers/`
196+
- `src/opentaskpy/remotehandlers/` (SSH, SFTP, WinRM addons live in separate repos but follow the same interface)
197+
- `src/opentaskpy/config/schemas/` — JSON schemas (canonical)
198+
- `tests/`, `tests/conftest.py` and `test/` helper scripts
199+
200+
## Change summary and contact
201+
202+
This file was created to give automated agents a reliable starting point for code navigation, safe edits, and test execution. If you're an external maintainer, open issues or PRs in this repository; include failing test output and the `-k` test used to reproduce locally.

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
# Changelog
22

3+
# v26.15.0
4+
5+
- Add `OTF_STALE_RUNNING_LOG_SECONDS` environment variable to allow resuming of batches from a `_running` log file that is older than the specified number of seconds.
6+
- Update GPG key used for testing with a 10 year expiry to stop tests failing.
7+
- Bump minimum versions dependencies to:
8+
- `jsonpath-ng >= 1.8`
9+
- `jsonschema >= 4.26`
10+
- `paramiko >= 4.0`
11+
- `requests >= 2.33`
12+
- `referencing >= 0.37`
13+
- `tenacity >= 9.1`
14+
- `python-gnupg >= 0.5.6`
15+
316
# No release
417

518
- Update docs

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ These are some environment variables that can be used to customise the behaviour
182182
- `OTF_VARIABLES_FILE` - Override the default variables file. This is useful when you want to use the same job definitions, but point at a different environment with different for example. Multiple files can be specified comma-separated. If variables appear in more than one file, they will be resolved from the last entry found.
183183
- `OTF_PARAMIKO_ULTRA_DEBUG` - Enables the hidden `ultra_debug` option for Paramiko. This will log all SSH communications to the console, and can be very verbose, so be careful when using this. Set to `1` to enable (This is for SFTP only)
184184
- `OTF_LAZY_LOAD_VARIABLES` - Enables lazy loading of variables. This will only load variables that are used by the task definition. This can be useful if you have a large number of variables, and you only need a few of them.
185+
- `OTF_STALE_RUNNING_LOG_SECONDS` - When resuming a batch, this variable defines how long a `_running` log file must be inactive for before it's considered stale, and the batch will attempt to resume from it. This is to prevent resuming from a log file that is still being written to by a currently running batch. Default behaviour ignores all `_running` log files, and only resumes using `_failed` log files. Set to `0` to disable this check, and allow resuming from any log file regardless of last modification time, or something like `300` to resume a crashed batch that's at least 5 minutes old.
185186

186187
## Logging
187188

@@ -493,6 +494,8 @@ Each task in a batch has an `order_id`, this is a unique ID for each task, and i
493494

494495
As a batch task runs, it writes out the status of each sub task to it's log file. If a failure occurs, and the batch is rerun with the same arguments, it will attempt to resume from the point of failure. To determine the previous state, the batch handler will look at only logs that are from the current date. This is tp ensure that if something failed at 1am yesterday, but hasn't been rerun, we won't try to recover from the point of failure. Sometimes you might want to recover regardless, this can be done by passing in the date of the log files that you want to recover from, using the environment variable `OTF_BATCH_RESUME_LOG_DATE` in the format `YYYYMMDD`. This will instruct the batch handler to look at logs with that date instead.
495496

497+
By default a batch will only every resume from a `_failed` run. If for some reason you want to resume from a `_running` log file (perhaps you had a crash for some reason), you can set the environment variable `OTF_STALE_RUNNING_LOG_SECONDS` to the number of seconds a `_running` log file must be inactive for before it's considered stale. This will then cause the resume logic to read the `_running` log file if it's at least as old as the number of seconds specified.
498+
496499
# Development
497500

498501
This repo has been primarily configured to work with GitHub Codespaces devcontainers, though it can obviously be used directly on your machine too.

pyproject.toml

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "opentaskpy"
7-
version = "v26.10.0"
7+
version = "v26.15.0"
88
authors = [{ name = "Adam McDonagh", email = "adam@elitemonkey.net" }]
99
license-files = [ "LICENSE" ]
1010

@@ -25,13 +25,13 @@ keywords = [
2525
]
2626
dependencies = [
2727
"jinja2 >= 3.1",
28-
"jsonpath-ng >= 1.5",
29-
"jsonschema >= 4.17",
30-
"paramiko >= 3.0",
31-
"requests >= 2.28",
32-
"referencing >= 0.29.1",
33-
"tenacity >= 8.2.3",
34-
"python-gnupg >= 0.5.2",
28+
"jsonpath-ng >= 1.8",
29+
"jsonschema >= 4.26",
30+
"paramiko >= 4.0",
31+
"requests >= 2.33",
32+
"referencing >= 0.37",
33+
"tenacity >= 9.1",
34+
"python-gnupg >= 0.5.6",
3535
"omegaconf >= 2.3.0",
3636
]
3737
description = "A framework for automation execution of commands and transferring files between hosts"
@@ -42,14 +42,14 @@ requires-python = ">=3.11"
4242
dev = [
4343
"types-requests >=2.28",
4444
"types-paramiko >=3.0",
45-
"black == 26.3.0",
45+
"black == 26.3.1",
4646
"isort",
4747
"pytest",
4848
"bumpver",
4949
"pytest-shell",
5050
"lovely-pytest-docker",
5151
"pre-commit",
52-
"pylint >= 3.2.2",
52+
"pylint >= 4.0",
5353
"pydantic",
5454
"mypy",
5555
"ruff",
@@ -71,7 +71,7 @@ otf-batch-validator = "opentaskpy.cli.batch_validator:main"
7171
profile = 'black'
7272

7373
[tool.bumpver]
74-
current_version = "v26.10.0"
74+
current_version = "v26.15.0"
7575
version_pattern = "vYY.WW.PATCH[-TAG]"
7676
commit_message = "bump version {old_version} -> {new_version}"
7777
commit = true
@@ -385,8 +385,6 @@ ignore = [
385385
"D407", # Section name underlining
386386
"E501", # line too long
387387
"E731", # do not assign a lambda expression, use a def
388-
# Ignored due to performance: https://github.com/charliermarsh/ruff/issues/2923
389-
"UP038", # Use `X | Y` in `isinstance` call instead of `(X, Y)`
390388
]
391389

392390

src/opentaskpy/otflogging.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import re
77
import threading
8+
import time
89
from datetime import datetime
910

1011
OTF_LOG_FORMAT = (
@@ -287,12 +288,18 @@ def get_latest_log_file(task_id: str, task_type: str) -> str | None:
287288
logs
288289
"""
289290
log_file_name = _define_log_file_name(task_id, task_type)
291+
292+
stale_running_log_secs = int(os.environ.get("OTF_STALE_RUNNING_LOG_SECONDS", -1))
293+
290294
# Obviously the date/time in the filename needs to be replaced with the latest
291295
# log file
292296
# Replace the prefix with a regex wildcard
293297
log_file_name = log_file_name.replace(os.environ["OTF_LOG_RUN_PREFIX"], ".*")
294-
# Also, we don't want to limit to running jobs, only failed or successful ones
295-
log_file_name = log_file_name.replace("_running", "(_failed)*")
298+
# Also, we don't want to limit to running jobs, only failed or successful ones, unless we're looking for stale logs
299+
log_file_name = log_file_name.replace(
300+
"_running",
301+
"(_failed)*" if stale_running_log_secs < 0 else "(_failed|_running)*",
302+
)
296303

297304
logger.debug(f"Looking for log file: {log_file_name}")
298305
if not os.path.exists(os.path.dirname(log_file_name)):
@@ -334,14 +341,35 @@ def get_latest_log_file(task_id: str, task_type: str) -> str | None:
334341
# Sort the list by the date/time in the filename
335342
log_files.sort(key=lambda x: datetime.strptime(x.split("_")[0], "%Y%m%d-%H%M%S.%f"))
336343
logger.debug(f"Log files after sorting: {log_files}")
344+
345+
# If we're looking for stale logs, then we want to filter out any _running logs that are NOT yet stale
346+
# (i.e. newer than stale_running_log_secs). We need to use the mtime of the file, not the timestamp
347+
# in the filename, since a live process keeps updating mtime whereas a crashed process stops.
348+
if stale_running_log_secs >= 0:
349+
log_files = [
350+
f
351+
for f in log_files
352+
if "_running" not in f
353+
or os.path.getmtime(os.path.join(os.path.dirname(log_file_name), f))
354+
<= time.time() - stale_running_log_secs
355+
]
356+
logger.debug(
357+
f"Log files after filtering out non-stale running logs: {log_files}"
358+
)
359+
337360
# Get the latest log file
338361
if log_files:
339362
log_file_name = f"{os.path.dirname(log_file_name)}/{log_files[-1]}"
340363
logger.info(f"Latest log file: {log_file_name}")
341-
# If the last log was a failure, return that, otherwise we just start from scratch, so return nothing
364+
# If the last log was a failure, return that
342365
if "_failed" in log_file_name:
343366
return log_file_name
367+
# If the last log is a stale running log, return it so the batch can resume from it
368+
if "_running" in log_file_name and stale_running_log_secs >= 0:
369+
logger.info("Stale running log file found. Resuming batch from it.")
370+
return log_file_name
344371

372+
# Otherwise we just start from scratch, so return nothing
345373
logger.info("No failed log file found. Starting from scratch.")
346374

347375
return None

0 commit comments

Comments
 (0)