Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
$GITHUB_OUTPUT
- name: Restore Python virtual environment
id: cache-venv
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: venv
key: >-
Expand All @@ -64,7 +64,7 @@ jobs:
hashFiles('.pre-commit-config.yaml') }}" >> $GITHUB_OUTPUT
- name: Restore pre-commit environment
id: cache-precommit
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: ${{ env.PRE_COMMIT_CACHE }}
key: >-
Expand Down Expand Up @@ -92,15 +92,15 @@ jobs:
check-latest: true
- name: Restore Python virtual environment
id: cache-venv
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: venv
fail-on-cache-miss: true
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
needs.prepare-base.outputs.python-key }}
- name: Restore pre-commit environment
id: cache-precommit
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: ${{ env.PRE_COMMIT_CACHE }}
fail-on-cache-miss: true
Expand Down Expand Up @@ -130,15 +130,15 @@ jobs:
check-latest: true
- name: Restore Python virtual environment
id: cache-venv
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: venv
fail-on-cache-miss: true
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
needs.prepare-base.outputs.python-key }}
- name: Restore pre-commit environment
id: cache-precommit
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: ${{ env.PRE_COMMIT_CACHE }}
fail-on-cache-miss: true
Expand All @@ -165,15 +165,15 @@ jobs:
check-latest: true
- name: Restore Python virtual environment
id: cache-venv
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: venv
fail-on-cache-miss: true
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
needs.prepare-base.outputs.python-key }}
- name: Restore pre-commit environment
id: cache-precommit
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: ${{ env.PRE_COMMIT_CACHE }}
fail-on-cache-miss: true
Expand Down Expand Up @@ -204,15 +204,15 @@ jobs:
check-latest: true
- name: Restore Python virtual environment
id: cache-venv
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: venv
fail-on-cache-miss: true
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
needs.prepare-base.outputs.python-key }}
- name: Restore pre-commit environment
id: cache-precommit
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: ${{ env.PRE_COMMIT_CACHE }}
fail-on-cache-miss: true
Expand Down Expand Up @@ -243,15 +243,15 @@ jobs:
check-latest: true
- name: Restore Python virtual environment
id: cache-venv
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: venv
fail-on-cache-miss: true
key: ${{ runner.os }}-${{ steps.python.outputs.python-version }}-${{
needs.prepare-base.outputs.python-key }}
- name: Restore pre-commit environment
id: cache-precommit
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: ${{ env.PRE_COMMIT_CACHE }}
fail-on-cache-miss: true
Expand Down Expand Up @@ -315,7 +315,7 @@ jobs:
check-latest: true
- name: Restore Python virtual environment
id: cache-venv
uses: actions/cache@v4.2.0
uses: actions/cache@v5
with:
path: venv
fail-on-cache-miss: true
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

# v26.18.1

- Fix memory leak in batches caused by now tidying up the remote handler objects after a task has completed.
- Fixed potential but very small memory leak in logger where loggers were not being removed from the logger dictionary after they were closed.
- Add `OTF_LOG_MEMORY_USAGE` environment variable to allow logging of RSS memory usage at each batch poll interval using the batch logger. This is useful for diagnosing memory growth in long-running batches. Added optional dev dependency `psutil` to allow this. Needs to be installed manually, otherwise log will just output `-1.0` for RSS memory usage.

# v26.18.0

- Disable logging of initialisation events by default, can be enabled by setting env var `OTG_LOG_INIT_EVENTS=1`
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ These are some environment variables that can be used to customise the behaviour
- `OTF_LOG_JSON` - Stderr logging will be in JSON format. Set to `1` to enable
- `OTF_LOG_DIRECTORY` - Path under which log files are written
- `OTF_NO_THREAD_SLEEP` - Disable the 1-second sleep between batch task thread creation. This sleep exists to prevent race conditions with concurrent protocol imports. Only disable this if you understand the implications. Set to `1` to disable.
- `OTF_BATCH_POLL_INTERVAL` - Interval in seconds between batch status-check loop iterations (default: `5`). Reducing this makes batches respond faster when tasks complete, at the cost of higher CPU polling overhead. **Intended for testing only — do not change in production unless you fully understand the trade-offs.**
- `OTF_LOG_MEMORY_USAGE` - Set to `1` to log the RSS memory usage of the OTF process at each batch poll interval using the batch logger. Useful for diagnosing memory growth in long-running batches. Requires `psutil` to be installed separately.
- `OTF_RUN_ID` - (meant for internal use) An aggregator for log files. When set, all log files for a run will go under this sub directory. E.g. running a batch, all execution and transfer logs will be dropped into this sub directory, rather than a directory for each task name. This is equivalent to using `-r` or `--runId` command line arguments, which is generally preferred.
- `OTF_SSH_KEY` - The private SSH key to use by default for all SSH connections. This is essential when using a basic docker container to trigger OTF. If not specified, it will default to use any private SSH keys available to the user executing the application.
- `OTF_STAGING_DIR` - Staging base directory to place files before they're dropped into their final location. Default is `/tmp`
Expand Down
28 changes: 15 additions & 13 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,21 @@ When running a batch, setting `-r` (or `OTF_RUN_ID`) causes all sub-task logs to

The full list of supported environment variables:

| Variable | Description |
| --------------------------- | ---------------------------------------------------------------------------- |
| `OTF_SSH_KEY` | Default private SSH key path for all SSH connections |
| `OTF_LOG_DIRECTORY` | Base directory for log files |
| `OTF_NO_LOG` | Set to `1` to disable file logging |
| `OTF_LOG_JSON` | Set to `1` for JSON-formatted stderr logging |
| `OTF_STAGING_DIR` | Override the staging directory for file transfers (default: `/tmp`) |
| `OTF_RUN_ID` | Log aggregation identifier; equivalent to `-r` flag |
| `OTF_BATCH_RESUME_LOG_DATE` | Resume batch from a specific date's logs (`YYYYMMDD` format) |
| `OTF_VARIABLES_FILE` | Override the default variables file path. Comma-separate multiple files |
| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task |
| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation |
| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) |
| Variable | Description |
| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `OTF_SSH_KEY` | Default private SSH key path for all SSH connections |
| `OTF_LOG_DIRECTORY` | Base directory for log files |
| `OTF_NO_LOG` | Set to `1` to disable file logging |
| `OTF_LOG_JSON` | Set to `1` for JSON-formatted stderr logging |
| `OTF_STAGING_DIR` | Override the staging directory for file transfers (default: `/tmp`) |
| `OTF_RUN_ID` | Log aggregation identifier; equivalent to `-r` flag |
| `OTF_BATCH_RESUME_LOG_DATE` | Resume batch from a specific date's logs (`YYYYMMDD` format) |
| `OTF_VARIABLES_FILE` | Override the default variables file path. Comma-separate multiple files |
| `OTF_LAZY_LOAD_VARIABLES` | Set to `1` to only resolve variables that are referenced by the current task |
| `OTF_NO_THREAD_SLEEP` | Set to `1` to disable the 1-second sleep between batch task thread creation |
| `OTF_BATCH_POLL_INTERVAL` | Interval in seconds between batch status-check iterations (default: `5`). **Testing only** — do not lower in production without understanding the trade-offs |
| `OTF_LOG_MEMORY_USAGE` | Set to `1` to log RSS memory usage at each batch poll interval via the batch logger. Requires `psutil` to be installed |
| `OTF_PARAMIKO_ULTRA_DEBUG` | Set to `1` to enable ultra-verbose Paramiko SSH debug output (SFTP only) |

## Overriding Variables at Runtime

Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "opentaskpy"
version = "v26.18.0"
version = "v26.18.1"
authors = [{ name = "Adam McDonagh", email = "adam@elitemonkey.net" }]
license-files = [ "LICENSE" ]

Expand Down Expand Up @@ -43,6 +43,7 @@ dev = [
"types-requests >=2.28",
"types-paramiko >=3.0",
"black == 26.3.1",
"psutil >= 5.9",
"isort",
"pytest",
"bumpver",
Expand Down Expand Up @@ -71,7 +72,7 @@ otf-batch-validator = "opentaskpy.cli.batch_validator:main"
profile = 'black'

[tool.bumpver]
current_version = "v26.18.0"
current_version = "v26.18.1"
version_pattern = "vYY.WW.PATCH[-TAG]"
commit_message = "bump version {old_version} -> {new_version}"
commit = true
Expand Down
2 changes: 2 additions & 0 deletions src/opentaskpy/cli/task_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def main() -> None:
OTF_LOG_LEVEL - Equivalent to using -v
OTF_LOG_INIT_EVENTS - Enable logging of log initialisation events. Set to 1 to enable
OTF_NO_THREAD_SLEEP - Disable the 1-second sleep between batch task thread creation. Set to 1 to disable
OTF_BATCH_POLL_INTERVAL - Interval in seconds between batch status-check iterations (default: 5). Intended for testing only; do not lower in production without understanding the trade-offs
OTF_LOG_MEMORY_USAGE - Set to 1 to log RSS memory usage at each batch poll interval via the batch logger. Requires psutil
OTF_SSH_KEY - Specify a particular SSH key to use for SSH/SFTP related transfers
OTF_STAGING_DIR - Staging base directory to place files before final location. Default is /tmp
OTF_BATCH_RESUME_LOG_DATE - Resume batch runs from a specific date in YYYYMMDD format
Expand Down
15 changes: 14 additions & 1 deletion src/opentaskpy/otflogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
log_file_name = handler.baseFilename

log_handlers = []
loggers_to_remove = []

if log_file_name:
new_log_filename = None
Expand All @@ -413,7 +414,7 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:

# Loop through every logger that exists and has a handler of this filename, and
# call the close method on it. Only the last one should rename the file
for logger_ in list(logging.Logger.manager.loggerDict.values()):
for logger_name, logger_ in list(logging.Logger.manager.loggerDict.items()):
if isinstance(logger_, logging.Logger):
for handler in logger_.handlers:
if (
Expand All @@ -422,6 +423,8 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
):
log_handlers.append(handler)
handler.close()
loggers_to_remove.append(logger_name)
break

# Now everything is closed, we can rename the log file
# If result is True, then rename the file and remove _running from the name
Expand All @@ -437,6 +440,16 @@ def close_log_file(logger__: logging.Logger, result: bool = False) -> None:
for handler in log_handlers:
handler.baseFilename = new_log_filename

# Unregister all task-scoped loggers that wrote to this log file so they
# can be garbage collected rather than accumulating in the process.
for logger_name in loggers_to_remove:
logging.Logger.manager.loggerDict.pop(logger_name, None)

# Always unregister the logger passed in directly — this handles the case
# where OTF_NO_LOG is set and no file handler was attached, so the
# loggers_to_remove list above would be empty.
logging.Logger.manager.loggerDict.pop(logger__.name, None)


def redact(log_message: str) -> str:
"""Redact any sensitive information from the log message.
Expand Down
27 changes: 25 additions & 2 deletions src/opentaskpy/taskhandlers/batch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Batch task handler class."""

import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, wait
Expand All @@ -18,6 +19,17 @@
DEFAULT_TASK_EXIT_CODE = 0
TASK_TYPE = "B"
BATCH_TASK_LOG_MARKER = "__OTF_BATCH_TASK_MARKER__"
DEFAULT_BATCH_POLL_INTERVAL = 5


def _rss_mb() -> float:
"""Return the RSS of the current process in MB, or -1 if psutil is unavailable."""
try:
import psutil # pylint: disable=import-outside-toplevel

return float(psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024))
except Exception: # pylint: disable=broad-except
return -1.0


class Batch(TaskHandler):
Expand Down Expand Up @@ -403,8 +415,15 @@ def run(self, kill_event: threading.Event | None = None) -> bool: # noqa: C901
)
break

# Sleep 5 seconds before checking again
time.sleep(5)
# Sleep before checking again
poll_interval = float(
environ.get("OTF_BATCH_POLL_INTERVAL", DEFAULT_BATCH_POLL_INTERVAL)
)
time.sleep(poll_interval)

# Log memory usage if requested
if environ.get("OTF_LOG_MEMORY_USAGE"):
self.logger.info(f"[memory] RSS: {_rss_mb():.1f} MB")

# Check if we have been asked to kill the batch
if kill_event and kill_event.is_set():
Expand Down Expand Up @@ -483,6 +502,10 @@ def task_runner(self, batch_task: dict, event: threading.Event) -> None:
batch_task["batch_task_spec"]["order_id"],
batch_task["task_id"],
)
# Release the task handler so that remote handler objects
# (boto3 clients, SSH connections, etc.) can be garbage
# collected rather than accumulating for the whole batch run.
batch_task["task_handler"] = None
break

# Check if we have been asked to kill the thread
Expand Down
Loading
Loading